You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/10 19:05:22 UTC

[4/4] incubator-usergrid git commit: moving dependencies to core persistence

moving dependencies to core persistence


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c3869371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c3869371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c3869371

Branch: refs/heads/USERGRID-365
Commit: c3869371611cedb8682a9f74a3811685da9d3206
Parents: 7daca75
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 10 10:50:24 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 10 10:50:24 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  3 +-
 .../usergrid/corepersistence/GuiceModule.java   | 12 +--
 .../migration/EntityDataMigration.java          | 71 ---------------
 .../migration/EntityTypeMappingMigration.java   | 53 ++++++-----
 .../migration/GraphShardVersionMigration.java   | 96 --------------------
 .../rx/EdgesToTargetObservable.java             | 63 -------------
 .../corepersistence/rx/TargetIdObservable.java  | 71 ---------------
 .../impl/AllEntitiesInSystemObservableImpl.java | 27 ++++--
 .../corepersistence/util/CpNamingUtils.java     | 14 ---
 .../migration/EntityDataMigrationIT.java        | 32 +++++--
 .../migration/EntityTypeMappingMigrationIT.java | 79 +++++++++-------
 .../migration/GraphShardVersionMigrationIT.java | 30 ++++--
 .../rx/AllEntitiesInSystemObservableIT.java     | 10 +-
 .../rx/ApplicationObservableTestIT.java         |  6 +-
 .../rx/EdgesFromSourceObservableIT.java         |  4 +-
 .../rx/EdgesToTargetObservableIT.java           |  6 +-
 .../rx/TargetIdObservableTestIT.java            |  7 +-
 .../mvcc/MvccEntityMigrationStrategy.java       |  3 +-
 .../MvccEntitySerializationStrategyProxy.java   | 13 +--
 ...cEntitySerializationStrategyProxyV1Impl.java |  2 +-
 ...cEntitySerializationStrategyProxyV2Impl.java |  2 +-
 .../serialization/impl/SerializationModule.java |  5 +
 .../core/entity/ApplicationEntityGroup.java     | 40 --------
 .../core/migration/data/DataMigration.java      |  5 +-
 .../data/DataMigrationManagerImpl.java          | 93 +++++++++++--------
 .../migration/schema/MigrationStrategy.java     |  9 +-
 .../core/rx/AllEntitiesInSystemObservable.java  |  2 +-
 .../core/scope/ApplicationEntityGroup.java      | 40 ++++++++
 .../core/guice/MaxMigrationVersion.java         |  6 +-
 .../data/DataMigrationManagerImplTest.java      | 37 +++++---
 .../persistence/graph/guice/GraphModule.java    |  6 ++
 .../serialization/EdgeMigrationStrategy.java    |  2 +-
 .../graph/serialization/TargetIdObservable.java | 38 ++++++++
 .../EdgeMetadataSerializationProxyImpl.java     | 19 +++-
 .../impl/TargetIdObservableImpl.java            | 70 ++++++++++++++
 35 files changed, 436 insertions(+), 540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5a4fcc1..0ca98e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,8 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AbstractEntity;
 import org.apache.usergrid.persistence.Entity;
@@ -55,6 +53,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.Health;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 0143a89..b58a246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -18,14 +18,10 @@ package org.apache.usergrid.corepersistence;
 import com.google.inject.AbstractModule;
 import com.google.inject.multibindings.Multibinder;
 
-import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
 import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
-import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.ApplicationObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
 import org.apache.usergrid.persistence.EntityManagerFactory;
@@ -33,9 +29,15 @@ import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxy;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
@@ -80,8 +82,6 @@ public class GuiceModule extends AbstractModule {
         Multibinder<DataMigration> dataMigrationMultibinder =
                 Multibinder.newSetBinder( binder(), DataMigration.class );
         dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
-        dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
-        dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
 
         Multibinder<EntityDeleted> entityBinder =
             Multibinder.newSetBinder(binder(), EntityDeleted.class);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
deleted file mode 100644
index 861c343..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-
-import com.google.inject.Inject;
-import rx.functions.Action1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class EntityDataMigration implements DataMigration {
-
-
-    private final MvccEntityMigrationStrategy migrationStrategy;
-    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
-
-    @Inject
-    public EntityDataMigration(  final MvccEntityMigrationStrategy migrationStrategy,
-                                 final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
-        this.migrationStrategy = migrationStrategy;
-        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
-    }
-
-    @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
-        allEntitiesInSystemObservable
-            .getAllEntitiesInSystem(1000)
-            .doOnNext(new Action1<ApplicationEntityGroup>() {
-                @Override
-                public void call( final ApplicationEntityGroup applicationEntityGroup ) {
-                    migrationStrategy.executeMigration(
-                        null,
-                        applicationEntityGroup,
-                        observer,
-                        CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
-                    );
-                }
-            })
-            .toBlocking().last();
-    }
-
-    @Override
-    public int getVersion() {
-        return migrationStrategy.getMigrationVersion();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 08001d1..8352cff 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -25,9 +25,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -36,7 +35,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 
+import rx.Observable;
+import rx.Scheduler;
 import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -56,34 +59,30 @@ public class EntityTypeMappingMigration implements DataMigration {
 
 
     @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer) throws Throwable {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
-                                     .doOnNext( new Action1<ApplicationEntityGroup>() {
-
-
-                                         @Override
-                                         public void call( final ApplicationEntityGroup applicationEntityGroup ) {
-
-                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
-
-
-                                             final MapManager mapManager = managerCache.getMapManager( ms );
-
-                                             for(Id entityId: applicationEntityGroup.entityIds) {
-                                                 final UUID entityUuid = entityId.getUuid();
-                                                 final String entityType = entityId.getType();
-
-                                                 mapManager.putString( entityUuid.toString(), entityType );
-
-                                                 if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                                     updateStatus( atomicLong, observer );
-                                                 }
-                                             }
-                                         }
-                                     } ).toBlocking().lastOrDefault( null );
+        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
+
+        final MapManager mapManager = managerCache.getMapManager(ms);
+        return Observable.from(applicationEntityGroup.entityIds).subscribeOn(Schedulers.io())
+            .map(new Func1<Id, Long>() {
+                @Override
+                public Long call(Id id) {
+                    for (Id entityId : applicationEntityGroup.entityIds) {
+                        final UUID entityUuid = entityId.getUuid();
+                        final String entityType = entityId.getType();
+
+                        mapManager.putString(entityUuid.toString(), entityType);
+
+                        if (atomicLong.incrementAndGet() % 100 == 0) {
+                            updateStatus(atomicLong, observer);
+                        }
+                    }
+                    return atomicLong.get();
+                }
+            });
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
deleted file mode 100644
index 41b390b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
- *
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
-import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
-
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class GraphShardVersionMigration implements DataMigration {
-
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
-    private final EdgesFromSourceObservable edgesFromSourceObservable;
-    private final EdgeMigrationStrategy migrationStrategy;
-
-
-    @Inject
-    public GraphShardVersionMigration( final EdgeMigrationStrategy migrationStrategy,
-                                       final GraphManagerFactory graphManagerFactory,
-                                       final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
-                                       final EdgesFromSourceObservable edgesFromSourceObservable
-    ) {
-        this.migrationStrategy = migrationStrategy;
-        this.graphManagerFactory = graphManagerFactory;
-        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
-        this.edgesFromSourceObservable = edgesFromSourceObservable;
-    }
-
-
-    @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
-
-        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
-            .flatMap(
-                new Func1<ApplicationEntityGroup, Observable<Long>>() {
-                    @Override
-                    public Observable<Long> call(
-                        final ApplicationEntityGroup applicationEntityGroup) {
-                        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
-                        Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
-                        //emit a stream of all ids from this group
-                        return migrationStrategy.executeMigration(
-                            edgesFromSource,
-                            applicationEntityGroup,
-                            observer,
-                            CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
-                        );
-                    }
-                })
-            .toBlocking().lastOrDefault(null);
-        ;
-    }
-
-
-    @Override
-    public int getVersion() {
-        return migrationStrategy.getMigrationVersion();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
deleted file mode 100644
index c5dc54d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all edges where the given node is the target node
- */
-public class EdgesToTargetObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( EdgesToTargetObservable.class );
-
-
-    /**
-     * Get all edges from the source
-     */
-    public static Observable<Edge> getEdgesToTarget(final GraphManager gm,  final Id targetNode) {
-        Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
-
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
-
-                logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
-
-                return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
-            }
-        } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
deleted file mode 100644
index 96b638b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all nodes that are target nodes from the given source node
- */
-public class TargetIdObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class );
-    private final EdgesFromSourceObservable edgesFromSourceObservable;
-
-    public TargetIdObservable(final EdgesFromSourceObservable edgesFromSourceObservable){
-        this.edgesFromSourceObservable = edgesFromSourceObservable;
-    }
-
-    /**
-     * Get all nodes that are target nodes from the sourceNode
-     * @param gm
-     * @param sourceNode
-     *
-     * @return
-     */
-    public Observable<Id> getTargetNodes( final GraphManager gm,  final Id sourceNode) {
-
-        //only search edge types that start with collections
-       return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
-
-
-           @Override
-           public Id call( final Edge edge ) {
-               final Id targetNode = edge.getTargetNode();
-
-               logger.debug( "Emitting targetId of {}", edge );
-
-
-               return targetNode;
-           }
-       } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
index 4218107..4608bd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -22,7 +22,7 @@ package org.apache.usergrid.corepersistence.rx.impl;
 
 import java.util.List;
 
-import org.apache.usergrid.corepersistence.rx.TargetIdObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -74,18 +75,28 @@ public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObs
                                             final Observable<Id> entityNodes =
                                                     targetIdObservable.getTargetNodes(gm, applicationId);
 
+
+                                            //get scope here
+
+
+                                            //emit Scope + ID
+
                                             //create our application node to emit since it's an entity as well
                                             final Observable<Id> applicationNode = Observable.just( applicationId );
 
                                             //merge both the specified application node and the entity node
                                             // so they all get used
-                                            return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
-                                                             .map( new Func1<List<Id>, ApplicationEntityGroup>() {
-                                                                 @Override
-                                                                 public ApplicationEntityGroup call( final List<Id> id ) {
-                                                                     return new ApplicationEntityGroup( applicationScope, id );
-                                                                 }
-                                                             } );
+                                            return Observable
+                                                .merge(applicationNode, entityNodes)
+                                                    .buffer(bufferSize)
+                                                    .map(new Func1<List<Id>, ApplicationEntityGroup>() {
+                                                        @Override
+                                                        public ApplicationEntityGroup call(final List<Id> ids) {
+                                                            //CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId,);
+
+                                                            return new ApplicationEntityGroup(applicationScope, ids);
+                                                        }
+                                                    });
                                         }
                                     } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 36c3a82..154519b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -72,20 +72,6 @@ public class CpNamingUtils {
      */
     public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
 
-    /**
-     * Returns a function to retreive collection scope
-     * @param applicationScope
-     * @return
-     */
-    public static Func1<Id,? extends ApplicationScope> getCollectionScopeByEntityIdFunc1(final ApplicationScope applicationScope){
-        Func1<Id,ApplicationScope> func = new Func1<Id, ApplicationScope>() {
-            @Override
-            public ApplicationScope call(Id id) {
-                return CpNamingUtils.getCollectionScopeNameFromEntityType(applicationScope.getApplication(), id.getType());
-            }
-        };
-        return func;
-    }
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index f58696e..2509771 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -25,6 +25,9 @@ import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
@@ -66,8 +69,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
     private Injector injector;
 
 
-    private EntityDataMigration entityDataMigration;
-    private ManagerCache managerCache;
+    private DataMigration entityDataMigration;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
     private MvccEntitySerializationStrategy v1Strategy;
@@ -81,17 +83,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
      */
     @Rule
     public MigrationTestRule migrationTestRule =
-            new MigrationTestRule( app, CpSetup.getInjector() ,EntityDataMigration.class  );
+            new MigrationTestRule( app, CpSetup.getInjector() ,MvccEntitySerializationStrategyProxyV2Impl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     @Before
     public void setup() {
         emf = setup.getEmf();
         injector = CpSetup.getInjector();
-        entityDataMigration = injector.getInstance( EntityDataMigration.class );
-        managerCache = injector.getInstance( ManagerCache.class );
+        entityDataMigration = injector.getInstance( MvccEntitySerializationStrategyProxyV2Impl.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
         MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
         v1Strategy = strategy.getMigration().from();
         v2Strategy = strategy.getMigration().to();
     }
@@ -132,7 +135,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         //read everything in previous version format and put it into our types.  Assumes we're
         //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
 
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
             .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
@@ -168,7 +171,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         assertTrue( "Saved new entities", savedEntities.size() > 0 );
 
         //perform the migration
-        entityDataMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(ApplicationEntityGroup applicationEntityGroup) {
+                   try {
+                       entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
+                   }catch (Throwable e){
+                       throw new RuntimeException(e);
+                   }
+                }
+            }).toBlocking().last();
+
 
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -184,7 +198,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
 
         //now visit all entities in the system again, load them from v2, and ensure they're the same
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
             .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
@@ -220,7 +234,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
         //now visit all entities in the system again, and load them from the EM,
         // ensure we see everything we did in the v1 traversal
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
             .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index ee95d58..3da0b85 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
@@ -72,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
     @Rule
     public MigrationTestRule migrationTestRule = new MigrationTestRule(
             app, CpSetup.getInjector() ,EntityTypeMappingMigration.class  );
-
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
@@ -83,6 +84,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         keyspace = injector.getInstance( Keyspace.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -123,54 +125,61 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
 
         final TestProgressObserver progressObserver = new TestProgressObserver();
 
-        entityTypeMappingMigration.migrate( progressObserver );
-
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(final ApplicationEntityGroup entity) {
+                    try {
+                        entityTypeMappingMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e ){
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
 
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
-            .doOnNext( new Action1<ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final ApplicationEntityGroup entity ) {
+                    final ApplicationEntityGroup entity) {
                     //ensure that each one has a type
 
                     final EntityManager em = emf.getEntityManager(
-                            entity.applicationScope.getApplication().getUuid() );
+                        entity.applicationScope.getApplication().getUuid());
 
-                    for ( final Id id : entity.entityIds ) {
+                    for (final Id id : entity.entityIds) {
                         try {
-                            final Entity returned = em.get( id.getUuid() );
+                            final Entity returned = em.get(id.getUuid());
 
                             //we seem to occasionally get phantom edges.  If this is the
                             // case we'll store the type _> uuid mapping, but we won't have
                             // anything to load
 
-                            if ( returned != null ) {
-                                assertEquals( id.getUuid(), returned.getUuid() );
-                                assertEquals( id.getType(), returned.getType() );
-                            }
-                            else {
-                                final String type = managerCache.getMapManager( CpNamingUtils
-                                        .getEntityTypeMapScope(
-                                                entity.applicationScope.getApplication() ) )
-                                                                .getString( id.getUuid()
-                                                                            .toString() );
-
-                                assertEquals( id.getType(), type );
+                            if (returned != null) {
+                                assertEquals(id.getUuid(), returned.getUuid());
+                                assertEquals(id.getType(), returned.getType());
+                            } else {
+                                final String type = managerCache.getMapManager(CpNamingUtils
+                                    .getEntityTypeMapScope(
+                                        entity.applicationScope.getApplication()))
+                                    .getString(id.getUuid()
+                                        .toString());
+
+                                assertEquals(id.getType(), type);
                             }
-                        }
-                        catch ( Exception e ) {
-                            throw new RuntimeException( "Unable to get entity " + id
-                                    + " by UUID, migration failed", e );
-                        }
+                        } catch (Exception e) {
+                            throw new RuntimeException("Unable to get entity " + id
+                                + " by UUID, migration failed", e);
+                                    }
 
-                        allEntities.remove( id );
-                    }
-                }
-            } ).toBlocking().lastOrDefault( null );
+                                    allEntities.remove(id);
+                                }
+                            }
+                        }).toBlocking().lastOrDefault(null);
 
 
-        assertEquals( "Every element should have been encountered", 0, allEntities.size() );
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-    }
-}
+                    assertEquals("Every element should have been encountered", 0, allEntities.size());
+                    assertFalse("Progress observer should not have failed", progressObserver.getFailed());
+                    assertTrue("Progress observer should have update messages", progressObserver.getUpdates().size() > 0);
+                }
+            }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 45f41aa..2d6d0d9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -23,7 +23,10 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -56,7 +59,7 @@ import org.junit.Ignore;
 public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
     private Injector injector;
-    private GraphShardVersionMigration graphShardVersionMigration;
+    private DataMigration graphShardVersionMigration;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -66,18 +69,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,GraphShardVersionMigration.class  );
-
+    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,EdgeMetadataSerializationProxyImpl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
     public void setup() {
         injector = CpSetup.getInjector();
-        graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+        graphShardVersionMigration = injector.getInstance( EdgeMetadataSerializationProxyImpl.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -115,7 +118,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
                                      .doOnNext( new Action1<ApplicationEntityGroup>() {
                                          @Override
                                          public void call(
@@ -155,7 +158,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //perform the migration
-        graphShardVersionMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(
+                    final ApplicationEntityGroup entity) {
+                    try {
+                        graphShardVersionMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e){
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).toBlocking().last();
 
         assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
@@ -172,7 +186,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
                                      .doOnNext( new Action1<ApplicationEntityGroup>() {
                                                     @Override
                                                     public void call(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index b246e04..30c9ac3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -24,7 +24,10 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +60,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1thing";
@@ -94,7 +100,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
+        allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
             @Override
             public void call( final ApplicationEntityGroup entity ) {
 
@@ -127,7 +133,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index daeca87..7c902ea 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 import rx.functions.Action1;
 
-import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 
 
@@ -51,6 +50,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
 
         final Application createdApplication = app.getEntityManager().getApplication();
 
+        ApplicationObservable applicationObservable = CpSetup.getInjector().getInstance(ApplicationObservable.class);
 
         //now our get all apps we expect.  There may be more, but we don't care about those.
         final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -65,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
         //clean up our wiring
         ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
 
-        Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds(managerCache);
+        Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
 
         appObservable.doOnNext( new Action1<Id>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 2aa7fbc..2564fe5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -60,6 +61,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesToTargetObservable edgesToTargetObservable = CpSetup.getInjector().getInstance(EdgesToTargetObservable.class);
         final EntityManager em = app.getEntityManager();
         final Application createdApplication = em.getApplication();
 
@@ -96,7 +98,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index ef0d953..5d25f62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesFromSourceObservable edgesFromSourceObservable=  CpSetup.getInjector().getInstance(EdgesFromSourceObservable.class);
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1things";
@@ -92,7 +94,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();
@@ -124,7 +126,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        EdgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index cde8866..e5b0319 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
 
@@ -93,7 +96,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        TargetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 
@@ -116,7 +119,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
index 08082e7..0a3cd3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
@@ -19,13 +19,12 @@
  */
 package org.apache.usergrid.persistence.collection.mvcc;
 
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
 
 /**
  * Classy class class.
  */
-public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy,MvccEntity> {
+public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
index 940fc3f..bb192cd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * migration data goes to both sources and is read from the old source. After the ugprade completes,
  * it will be available from the new source
  */
-public  abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
 
 
     private final DataMigrationManager dataMigrationManager;
@@ -146,7 +146,7 @@ public  abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
      * Return true if we're on an old version
      */
     private boolean isOldVersion() {
-        return dataMigrationManager.getCurrentVersion() < getMigrationVersion();
+        return dataMigrationManager.getCurrentVersion() < getVersion();
     }
 
 
@@ -156,7 +156,7 @@ public  abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
     }
 
     @Override
-    public Observable<Long> executeMigration(final Observable<MvccEntity> entityObservable,final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer, final Func1<Id,? extends ApplicationScope> getCollectionScopeFromEntityId) {
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
         final AtomicLong atomicLong = new AtomicLong();
         final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
@@ -172,11 +172,12 @@ public  abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
                 @Override
                 public Id call(Id entityId) {
 
-                    ApplicationScope applicationScope = getCollectionScopeFromEntityId.call(entityId);
+                    ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
 
                     if (!(applicationScope instanceof CollectionScope)) {
                         throw new IllegalArgumentException("getCollectionScopeFromEntityId must return a collection scope");
                     }
+
                     CollectionScope currentScope = (CollectionScope) applicationScope;
                     MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = getMigration();
                     //for each element in the history in the previous version,
@@ -214,10 +215,10 @@ public  abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
         try {
             batch.execute();
 
-            po.update( getMigrationVersion(), "Finished copying " + count + " entities to the new format" );
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
         }
         catch ( ConnectionException e ) {
-            po.failed( getMigrationVersion(), "Failed to execute mutation in cassandra" );
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
             throw new DataMigrationException( "Unable to migrate batches ", e );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
index 6bb6e11..7be7f70 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
@@ -59,7 +59,7 @@ public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerial
     }
 
     @Override
-    public int getMigrationVersion() {
+    public int getVersion() {
         return V2Impl.MIGRATION_VERSION;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
index 4ec2d2a..6b24ac7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
@@ -26,7 +26,7 @@ public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerial
     }
 
     @Override
-    public int getMigrationVersion() {
+    public int getVersion() {
         return V3Impl.MIGRATION_VERSION;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index ab0b489..cf55eca 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -22,6 +22,7 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrate
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.*;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 import com.google.inject.AbstractModule;
@@ -53,6 +54,10 @@ public class SerializationModule extends AbstractModule {
         bind(MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
                                                      .to(MvccEntitySerializationStrategyProxyV2Impl.class);
 
+        Multibinder<DataMigration> dataMigrationMultibinder =
+            Multibinder.newSetBinder( binder(), DataMigration.class );
+        dataMigrationMultibinder.addBinding().to( MvccEntitySerializationStrategyProxyV2Impl.class );
+
         bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
 
         bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
deleted file mode 100644
index fd3bba6..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.entity;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import java.util.List;
-
-/**
- * Get the entity data.  Immutable bean for fast access
- */
-public final class ApplicationEntityGroup {
-    public final ApplicationScope applicationScope;
-    public final List<Id> entityIds;
-
-
-    public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
-        this.applicationScope = applicationScope;
-        this.entityIds = entityIds;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index 775df5d..f945375 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.core.migration.data;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
 /**
  * An interface for updating data.  Has 2 basic functions. First it will perform the migration and update the status
  * object.
@@ -45,7 +48,7 @@ public interface DataMigration {
      * @param observer
      * @throws Throwable
      */
-    public void migrate(final ProgressObserver observer) throws Throwable;
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup,final ProgressObserver observer) throws Throwable;
 
     /**
      * Get the version of this migration.  It must be unique.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index a9719b7..8ad3295 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -27,6 +27,8 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +40,8 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import rx.Observable;
+import rx.functions.Func1;
 
 
 @Singleton
@@ -48,13 +52,14 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     private final TreeMap<Integer, DataMigration> migrationTreeMap = new TreeMap<>();
 
     private final MigrationInfoSerialization migrationInfoSerialization;
+    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     /**
      * Cache to cache versions temporarily
      */
     private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
             //cache the local value for 1 minute
-            .expireAfterWrite( 1, TimeUnit.MINUTES ).build( new CacheLoader<String, Integer>() {
+            .expireAfterWrite(1, TimeUnit.MINUTES).build( new CacheLoader<String, Integer>() {
                 @Override
                 public Integer load( final String key ) throws Exception {
                     return migrationInfoSerialization.getVersion();
@@ -64,14 +69,17 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
     @Inject
     public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
-                                     final Set<DataMigration> migrations ) {
+                                     final Set<DataMigration> migrations,
+                                     final AllEntitiesInSystemObservable allEntitiesInSystemObservable
+    ) {
 
-        Preconditions.checkNotNull( migrationInfoSerialization, 
+        Preconditions.checkNotNull( migrationInfoSerialization,
                 "migrationInfoSerialization must not be null" );
         Preconditions.checkNotNull( migrations, "migrations must not be null" );
+        Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
 
         this.migrationInfoSerialization = migrationInfoSerialization;
-
+        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
 
         for ( DataMigration migration : migrations ) {
 
@@ -89,8 +97,8 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
                 final Class<? extends DataMigration> currentClass = migration.getClass();
 
 
-                throw new DataMigrationException( String.format( 
-                        "Data migrations must be unique.  Both classes %s and %s have version %d", 
+                throw new DataMigrationException( String.format(
+                        "Data migrations must be unique.  Both classes %s and %s have version %d",
                         existingClass, currentClass, version ) );
             }
 
@@ -114,49 +122,56 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
                 migrationTreeMap.lastKey() );
 
         //we have our migrations to run, execute them
-        final NavigableMap<Integer, DataMigration> migrationsToRun = 
+        final NavigableMap<Integer, DataMigration> migrationsToRun =
                 migrationTreeMap.tailMap( currentVersion, false );
 
-        CassandraProgressObserver observer = new CassandraProgressObserver();
+        final CassandraProgressObserver observer = new CassandraProgressObserver();
 
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+            .map(
+                new Func1<ApplicationEntityGroup, Long>() {
+                    @Override
+                    public Long call(
+                        final ApplicationEntityGroup applicationEntityGroup) {
+                        for (DataMigration migration : migrationsToRun.values()) {
 
-        for ( DataMigration migration : migrationsToRun.values() ) {
+                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
 
-            migrationInfoSerialization.setStatusCode( StatusCode.RUNNING.status );
+                            final int migrationVersion = migration.getVersion();
 
-            final int migrationVersion = migration.getVersion();
+                            LOG.info("Running migration version {}", migrationVersion);
 
-            LOG.info( "Running migration version {}", migrationVersion );
+                            observer.update(migrationVersion, "Starting migration");
 
-            observer.update( migrationVersion, "Starting migration" );
 
+                            //perform this migration, if it fails, short circuit
+                            try {
+                                migration.migrate(applicationEntityGroup,observer);
+                            } catch (Throwable throwable) {
+                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
 
-            //perform this migration, if it fails, short circuit
-            try {
-                migration.migrate( observer );
-            }
-            catch ( Throwable throwable ) {
-                observer.failed( migrationVersion, "Exception thrown during migration", throwable );
+                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
 
-                LOG.error( "Unable to migrate to version {}.", migrationVersion, throwable );
+                                return 0L;
+                            }
 
-                return;
-            }
+                            //we had an unhandled exception or the migration failed, short circuit
+                            if (observer.failed) {
+                                return 0L;
+                            }
 
-            //we had an unhandled exception or the migration failed, short circuit
-            if ( observer.failed ) {
-                return;
-            }
+                            //set the version
+                            migrationInfoSerialization.setVersion(migrationVersion);
 
-            //set the version
-            migrationInfoSerialization.setVersion( migrationVersion );
-
-            versionCache.invalidateAll();
-
-            //update the observer for progress so other nodes can see it
-            observer.update( migrationVersion, "Completed successfully" );
-        }
+                            versionCache.invalidateAll();
 
+                            //update the observer for progress so other nodes can see it
+                            observer.update(migrationVersion, "Completed successfully");
+                        }
+                        return 0L;
+                    }
+                }).toBlocking().lastOrDefault(null);
+        ;
         migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
     }
 
@@ -188,7 +203,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     public void resetToVersion( final int version ) {
         final int highestAllowed = migrationTreeMap.lastKey();
 
-        Preconditions.checkArgument( version <= highestAllowed, 
+        Preconditions.checkArgument( version <= highestAllowed,
                 "You cannot set a version higher than the max of " + highestAllowed);
         Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
 
@@ -225,7 +240,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
         @Override
         public void failed( final int migrationVersion, final String reason ) {
 
-            final String storedMessage = String.format( 
+            final String storedMessage = String.format(
                     "Failed to migrate, reason is appended.  Error '%s'", reason );
 
 
@@ -245,13 +260,13 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
             throwable.printStackTrace( new PrintWriter( stackTrace ) );
 
 
-            final String storedMessage = String.format( 
+            final String storedMessage = String.format(
                 "Failed to migrate, reason is appended.  Error '%s' %s", reason, stackTrace.toString() );
 
             update( migrationVersion, storedMessage );
 
 
-            LOG.error( "Unable to migrate version {} due to reason {}.", 
+            LOG.error( "Unable to migrate version {} due to reason {}.",
                     migrationVersion, reason, throwable );
 
             failed = true;
@@ -262,7 +277,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
         @Override
         public void update( final int migrationVersion, final String message ) {
-            final String formattedOutput = String.format( 
+            final String formattedOutput = String.format(
                     "Migration version %d.  %s", migrationVersion, message );
 
             //Print this to the info log

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
index cb2f2eb..758f54e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
@@ -19,7 +19,7 @@
  */
 package org.apache.usergrid.persistence.core.migration.schema;
 
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,18 +30,13 @@ import rx.functions.Func1;
 /**
  * Interface to encapsulate directional migrations
  */
-public interface MigrationStrategy<T,E> {
+public interface MigrationStrategy<T> extends DataMigration {
     /**
      * Returns the migration pattern to use
      * @return
      */
     public MigrationRelationship<T> getMigration();
 
-    public int getMigrationVersion();
-
-    Observable<Long> executeMigration(final Observable<E> sourceObservable, final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer, final Func1<Id,? extends ApplicationScope> getScopeFromEntityId);
-
-
     public class MigrationRelationship<T>  {
         private final T from;
         private final T to;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index 2205f72..1ec017e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -19,7 +19,7 @@
  */
 package org.apache.usergrid.persistence.core.rx;
 
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import rx.Observable;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
new file mode 100644
index 0000000..a767f4a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.scope;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+/**
+ * Get the entity data.  Immutable bean for fast access
+ */
+public final class ApplicationEntityGroup {
+    public final ApplicationScope applicationScope;
+    public final List<Id> entityIds;
+
+    public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
+        this.applicationScope = applicationScope;
+        this.entityIds = entityIds;
+    }
+
+}
+