You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/04 18:28:39 UTC

[4/4] incubator-usergrid git commit: Pushed migration tests down to core tiers. WIP

Pushed migration tests down to core tiers.  WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/802dcdec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/802dcdec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/802dcdec

Branch: refs/heads/USERGRID-405
Commit: 802dcdec4405f4a358cef7d253fa5695d64769cc
Parents: 7925a4b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 3 17:34:21 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 4 10:28:10 2015 -0700

----------------------------------------------------------------------
 .../AllApplicationsObservable.java              |  39 +++
 .../usergrid/corepersistence/CoreModule.java    |  80 ++++--
 .../corepersistence/CpEntityManagerFactory.java |  60 +++--
 .../events/EntityVersionDeletedHandler.java     |  17 +-
 .../migration/CoreDataVersions.java             |  44 +++
 .../migration/CoreMigrationPlugin.java          |  59 ++++
 .../migration/EntityTypeMappingMigration.java   |  90 +++----
 .../migration/MigrationModuleVersion.java       | 112 ++++++++
 .../corepersistence/migration/Versions.java     |  24 --
 .../rx/impl/AbstractGraphVisitorImpl.java       | 110 ++++++++
 .../rx/impl/AllApplicationsObservableImpl.java  | 142 ++++++++++
 .../rx/impl/AllEntitiesInSystemImpl.java        |  62 +++++
 .../impl/AllEntitiesInSystemObservableImpl.java | 127 ---------
 .../rx/impl/AllNodesInGraphImpl.java            |  55 ++++
 .../rx/impl/ApplicationObservableImpl.java      | 149 ----------
 .../migration/EntityDataMigrationIT.java        | 270 -------------------
 .../migration/EntityTypeMappingMigrationIT.java |   1 -
 .../migration/GraphShardVersionMigrationIT.java | 235 ----------------
 .../migration/MigrationTestRule.java            |   9 +-
 .../migration/TestProgressObserver.java         |  71 -----
 .../rx/AllEntitiesInSystemObservableIT.java     |  45 +---
 .../rx/ApplicationObservableTestIT.java         |  16 +-
 .../persistence/collection/MvccLogEntry.java    |   1 +
 .../collection/guice/CollectionModule.java      |  31 +--
 .../persistence/collection/guice/Write.java     |  17 --
 .../collection/guice/WriteUpdate.java           |  17 --
 .../EntityCollectionManagerFactoryImpl.java     |   9 +-
 .../impl/EntityCollectionManagerImpl.java       |  21 +-
 .../collection/mvcc/stage/write/WriteStart.java |   6 +-
 .../migration/MvccEntityDataMigrationImpl.java  |  50 ----
 .../mvcc/stage/write/WriteStartTest.java        |   6 +-
 .../MvccEntityDataMigrationV1ToV3ImplTest.java  | 179 ++++++++++++
 .../core/guice/DataMigrationResetRule.java      |  88 ++++++
 .../migration/data/TestProgressObserver.java    |  71 +++++
 .../persistence/core/util/IdGenerator.java      |  51 ++++
 .../persistence/graph/guice/GraphModule.java    |   7 +-
 .../impl/EdgeDataMigrationImpl.java             | 152 -----------
 .../impl/migration/EdgeDataMigrationImpl.java   | 149 ++++++++++
 .../impl/migration/GraphMigrationPlugin.java    |   7 +-
 .../serialization/impl/migration/GraphNode.java |  39 +++
 .../persistence/graph/GraphManagerIT.java       |  21 +-
 .../persistence/graph/GraphManagerLoadTest.java |  13 +-
 .../graph/GraphManagerShardConsistencyIT.java   |  10 +-
 .../graph/GraphManagerShardingIT.java           |  13 +-
 .../graph/GraphManagerStressTest.java           |  11 +-
 .../graph/guice/TestGraphModule.java            |   5 +-
 .../graph/impl/EdgeDeleteListenerTest.java      |  11 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   9 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |   7 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |  29 +-
 .../EdgeMetadataSerializationTest.java          |  39 +--
 .../EdgeSerializationChopTest.java              |   7 +-
 .../serialization/EdgeSerializationTest.java    |  21 +-
 .../serialization/NodeSerializationTest.java    |  15 +-
 .../migration/EdgeDataMigrationImplTest.java    | 177 ++++++++++++
 .../impl/shard/EdgeShardSerializationTest.java  |   7 +-
 .../impl/shard/NodeShardAllocationTest.java     |  27 +-
 .../impl/shard/NodeShardCacheTest.java          |   7 +-
 .../impl/shard/ShardGroupCompactionTest.java    |   7 +-
 .../shard/count/NodeShardApproximationTest.java |   9 +-
 .../NodeShardCounterSerializationTest.java      |   6 +-
 .../shard/impl/ShardEntryGroupIteratorTest.java |  15 +-
 ...rceDirectedEdgeDescendingComparatorTest.java |  23 +-
 ...getDirectedEdgeDescendingComparatorTest.java |  23 +-
 .../graph/test/util/EdgeTestUtils.java          |  29 +-
 65 files changed, 1767 insertions(+), 1492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
new file mode 100644
index 0000000..1187cf2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
+
+import rx.Observable;
+
+
+/**
+ * Interface for generating an observable of all ApplicationScope
+ */
+public interface AllApplicationsObservable {
+
+    /**
+     * Return all applications in our system
+     * @return
+     */
+    public Observable<ApplicationScope> getAllApplications();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index d7d7f2f..9f01feb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -15,33 +15,38 @@
  */
 package org.apache.usergrid.corepersistence;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Provider;
-import com.google.inject.multibindings.Multibinder;
 
-import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
+import org.springframework.context.ApplicationContext;
+
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
+import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
+import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.MvccEntityDataMigrationImpl;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
 
 
 /**
@@ -70,19 +75,34 @@ public class CoreModule  extends AbstractModule {
         bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
 
         install( new CommonModule());
-        install(new CollectionModule());
-        install(new GraphModule());
+        install( new CollectionModule() {
+            /**
+             * configure our migration data provider for all entities in the system
+             */
+            @Override
+           public void configureMigrationProvider() {
+
+                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
+                    AllEntitiesInSystemImpl.class );
+           }
+        } );
+        install( new GraphModule() {
+
+            /**
+             * Override the observable that needs to be used for migration
+             */
+            @Override
+            public void configureMigrationProvider() {
+                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
+                    AllNodesInGraphImpl.class );
+            }
+        } );
         install(new IndexModule());
         install(new MapModule());
         install(new QueueModule());
 
         bind(ManagerCache.class).to( CpManagerCache.class );
-        bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
-        bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-                Multibinder.newSetBinder( binder(), DataMigration.class );
-        dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
 
         Multibinder<EntityDeleted> entityBinder =
             Multibinder.newSetBinder(binder(), EntityDeleted.class);
@@ -97,6 +117,26 @@ public class CoreModule  extends AbstractModule {
         versionCreatedMultibinder.addBinding().to(EntityVersionCreatedHandler.class);
 
 
+        /**
+         * Create our migrations for within our core plugin
+         *
+         */
+        Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =
+                    Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>() {} );
+
+
+        dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
+        dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
+
+
+        //wire up the collection migration plugin
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to( CoreMigrationPlugin.class );
+
+        bind(AllApplicationsObservable.class).to(AllApplicationsObservableImpl.class);
+
+
+
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 61f32c5..9d0b03a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -15,12 +15,6 @@
  */
 package org.apache.usergrid.corepersistence;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Injector;
-import com.yammer.metrics.annotation.Metered;
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,6 +27,12 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -43,16 +43,15 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Results;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.Setup;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.Health;
@@ -68,15 +67,24 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.utils.UUIDUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.yammer.metrics.annotation.Metered;
+
 import rx.Observable;
 
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+
 
 /**
  * Implement good-old Usergrid EntityManagerFactory with the new-fangled Core Persistence API.
@@ -107,7 +115,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private ManagerCache managerCache;
 
-    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     private DataMigrationManager dataMigrationManager;
 
@@ -171,10 +178,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         return managerCache;
     }
 
-    private AllEntitiesInSystemObservable getAllEntitiesObservable(){
-        if(allEntitiesInSystemObservable==null)
-            allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
-        return allEntitiesInSystemObservable;
+    private Observable<EntityIdScope> getAllEntitiesObservable(){
+      return injector.getInstance( Key.get(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){})).getData();
     }
 
 
@@ -601,7 +606,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public long performEntityCount() {
         //TODO, this really needs to be a task that writes this data somewhere since this will get
         //progressively slower as the system expands
-        return (Long) getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
+        return (Long) getAllEntitiesObservable().longCount().toBlocking().last();
     }
 
 
@@ -720,20 +725,25 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     @Override
     public String getMigrateDataStatus() {
-        return dataMigrationManager.getLastStatus();
+    //TODO  USERGRID-405      return dataMigrationManager.getLastStatus();
+        return null;
     }
 
 
     @Override
     public int getMigrateDataVersion() {
-        return dataMigrationManager.getCurrentVersion();
+        //TODO USERGRID-405
+        //return dataMigrationManager.getCurrentVersion();
+        return 0;
     }
 
 
     @Override
     public void setMigrationVersion( final int version ) {
-        dataMigrationManager.resetToVersion( version );
-        dataMigrationManager.invalidate();
+        //TODO USERGRID-405
+//
+//        dataMigrationManager.resetToVersion( version );
+//        dataMigrationManager.invalidate();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 127ae46..65dacee 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,6 +24,7 @@ import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.index.EntityIndex;
@@ -53,9 +54,11 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
     private EntityManagerFactory emf;
 
 
+
     @Override
-    public void versionDeleted(
-            final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+    public void versionDeleted( final CollectionScope scope, final Id entityId,
+                                final List<MvccLogEntry> entityVersions ) {
+
 
         // This check is for testing purposes and for a test that to be able to dynamically turn
         // off and on delete previous versions so that it can test clean-up on read.
@@ -84,13 +87,12 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
                 scope.getName()
         );
 
-        rx.Observable.from(entityVersions)
-            .subscribeOn(Schedulers.io())
+        rx.Observable.from( entityVersions )
             .buffer(serializationFig.getBufferSize())
-            .map(new Func1<List<MvccEntity>, List<MvccEntity>>() {
+            .map(new Func1<List<MvccLogEntry>, List<MvccLogEntry>>() {
                 @Override
-                public List<MvccEntity> call(List<MvccEntity> entityList) {
-                    for (MvccEntity entity : entityList) {
+                public List<MvccLogEntry> call(List<MvccLogEntry> entityList) {
+                    for (MvccLogEntry entity : entityList) {
                         eibatch.deindex(indexScope, entityId, entity.getVersion());
                     }
                     eibatch.execute();
@@ -99,4 +101,5 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
             }).toBlocking().last();
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
new file mode 100644
index 0000000..58ba6a3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+/**
+ * Versions of data as they exist across our system
+ */
+public enum CoreDataVersions {
+     //even though this didn't really come first in time, we need to run this first in order to bring our system
+    //up to date so that our new migration module can proceed.
+
+    INITIAL(0),
+    MIGRATION_VERSION_FIX(1),
+    ID_MAP_FIX(2);
+
+
+    private final int version;
+
+
+    private CoreDataVersions( final int version ) {this.version = version;}
+
+
+    public int getVersion() {
+        return version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
new file mode 100644
index 0000000..b95900f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.Set;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Migration plugin for the collection module
+ */
+@Singleton
+public class CoreMigrationPlugin extends AbstractMigrationPlugin<EntityIdScope> {
+
+    public static final String PLUGIN_NAME = "core-data";
+
+
+
+    @Inject
+    public CoreMigrationPlugin( final Set<DataMigration2<EntityIdScope>> entityDataMigrations,
+                                final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider,
+                                final MigrationInfoSerialization migrationInfoSerialization ) {
+        super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
+    }
+
+
+    @Override
+    public String getName() {
+        return PLUGIN_NAME;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 4dbc373..80d7ebe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -23,88 +23,88 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
-
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Action1;
 import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 /**
  * Migration to ensure that our entity id is written into our map data
  */
-public class EntityTypeMappingMigration implements CollectionDataMigration {
+public class EntityTypeMappingMigration implements DataMigration2<EntityIdScope> {
 
     private final ManagerCache managerCache;
-    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+    private final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable;
 
 
     @Inject
-    public EntityTypeMappingMigration( final ManagerCache managerCache, final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
-       this.managerCache = managerCache;
+    public EntityTypeMappingMigration( final ManagerCache managerCache,
+                                       final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable ) {
+        this.managerCache = managerCache;
         this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
     }
 
 
     @Override
-    public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final ProgressObserver observer) throws Throwable {
+    public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+                        final ProgressObserver observer ) {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Long>>() {
-            @Override
-            public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
-                final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
-
-                final MapManager mapManager = managerCache.getMapManager(ms);
-                return Observable.from(applicationEntityGroup.entityIds)
-                    .subscribeOn(Schedulers.io())
-                    .map(new Func1<EntityIdScope, Long>() {
-                        @Override
-                        public Long call(EntityIdScope idScope) {
-                            final UUID entityUuid = idScope.getId().getUuid();
-                            final String entityType = idScope.getId().getType();
-
-                            mapManager.putString(entityUuid.toString(), entityType);
-
-                            if (atomicLong.incrementAndGet() % 100 == 0) {
-                                updateStatus(atomicLong, observer);
-                            }
-                            return atomicLong.get();
-                        }
-                    });
-            }
-        });
-    }
 
+        return allEntitiesInSystemObservable.getData()
+                                            //process the entities in parallel
+         .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() {
+
+
+                 @Override
+                 public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
 
-    private void updateStatus( final AtomicLong counter, final ProgressObserver observer ) {
+                     //for each entity observable, get the map scope and write it to the map
+                     return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() {
+                         @Override
+                         public void call( final EntityIdScope entityIdScope ) {
+                             final MapScope ms = CpNamingUtils
+                                 .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
 
-        observer.update( getVersion(), String.format( "Updated %d entities", counter.get() ) );
+                             final MapManager mapManager = managerCache.getMapManager( ms );
+
+                             final UUID entityUuid = entityIdScope.getId().getUuid();
+                             final String entityType = entityIdScope.getId().getType();
+
+                             mapManager.putString( entityUuid.toString(), entityType );
+
+                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+                                 observer.update( getMaxVersion(),
+                                     String.format( "Updated %d entities", atomicLong.get() ) );
+                             }
+                         }
+                     } );
+                 }
+             } ).count().toBlocking().last();
     }
 
 
     @Override
-    public int getVersion() {
-        return Versions.VERSION_1;
+    public boolean supports( final int currentVersion ) {
+        //we move from the migration version fix to the current version
+        return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion() == currentVersion;
     }
 
+
     @Override
-    public MigrationType getType() {
-        return MigrationType.Entities;
+    public int getMaxVersion() {
+        return CoreDataVersions.ID_MAP_FIX.getVersion();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
new file mode 100644
index 0000000..1c124ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV2Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Migration to set our module versions now that we've refactor for sub modules Keeps the EntityIdScope because it won't
+ * subscribe to the data provider.
+ */
+public class MigrationModuleVersion implements DataMigration2<EntityIdScope> {
+
+
+    /**
+     * The migration from 0 -> 1 that re-writes all the entity id's into the map module
+     */
+    private static final int ID_MIGRATION = 1;
+
+    /**
+     * The migration from 1-> 2 that shards our edge meta data
+     */
+    private static final int EDGE_SHARD_MIGRATION = 2;
+
+    /**
+     * The migration from 2-> 3 that fixed the short truncation bug
+     */
+    private static final int ENTITY_V2_MIGRATION = 3;
+
+    private final MigrationInfoSerialization migrationInfoSerialization;
+
+    private final MvccEntitySerializationStrategyV2Impl serializationStrategyV2;
+
+    private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
+
+
+    @Inject
+    public MigrationModuleVersion( final MigrationInfoSerialization migrationInfoSerialization,
+                                   final MvccEntitySerializationStrategyV2Impl serializationStrategyV2,
+                                   final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
+        this.migrationInfoSerialization = migrationInfoSerialization;
+        this.serializationStrategyV2 = serializationStrategyV2;
+        this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
+    }
+
+
+    @Override
+    public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+                        final ProgressObserver observer ) {
+
+        //we ignore our current version, since it will always be 0
+        final int legacyVersion = migrationInfoSerialization.getSystemVersion();
+
+        //now we store versions for each of our modules
+
+        switch ( legacyVersion ) {
+            case ID_MIGRATION:
+                //no op, we need to migration everything in all modules
+                break;
+            //we need to set the version of the entity data, and our edge shard migration.  The fall through (no break) is deliberate
+            case ENTITY_V2_MIGRATION:
+               migrationInfoSerialization.setVersion( CollectionMigrationPlugin.PLUGIN_NAME, serializationStrategyV2.getImplementationVersion() );
+            case EDGE_SHARD_MIGRATION:
+                //set our shard migration to the migrated version
+                migrationInfoSerialization.setVersion( GraphMigrationPlugin.PLUGIN_NAME, edgeMetadataSerializationV2.getImplementationVersion() );
+                break;
+        }
+
+        return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion();
+    }
+
+
+    @Override
+    public boolean supports( final int currentVersion ) {
+        //we move from the migration version fix to the current version
+        return CoreDataVersions.INITIAL.getVersion() == currentVersion;
+    }
+
+
+    @Override
+    public int getMaxVersion() {
+        return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index 7d05733..d921353 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -22,34 +22,10 @@
 package org.apache.usergrid.corepersistence.migration;
 
 
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.guice.V3Impl;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
-
-
 /**
  * Simple class to hold the constants of all versions
  */
 public class Versions {
 
-    /**
-     * Version 1 of our mappings
-     */
-    public static final int VERSION_1 = 1;
-
-    /**
-     * Version 2.  Edge meta changes
-     */
-    public static final int VERSION_2 = EdgeMigrationStrategy.MIGRATION_VERSION;
-
-    /**
-     * Version 3. migrate from entity serialization 1 -> 2
-     */
-    public static final int VERSION_3 = V2Impl.MIGRATION_VERSION;
-
-    /**
-     * Version 4. migrate from entity serialization 1 -> 2
-     */
-    public static final int VERSION_4 = V3Impl.MIGRATION_VERSION;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
new file mode 100644
index 0000000..07b7a3d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public abstract class AbstractGraphVisitorImpl<T> implements MigrationDataProvider<T> {
+
+    private final AllApplicationsObservable applicationObservable;
+    private final GraphManagerFactory graphManagerFactory;
+    private final TargetIdObservable targetIdObservable;
+
+    @Inject
+    public AbstractGraphVisitorImpl( AllApplicationsObservable applicationObservable,
+                                     GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable ) {
+
+        this.applicationObservable = applicationObservable;
+        this.graphManagerFactory = graphManagerFactory;
+        this.targetIdObservable = targetIdObservable;
+    }
+
+
+
+    @Override
+    public Observable<T> getData() {
+        return applicationObservable.getAllApplications().flatMap( new Func1<ApplicationScope, Observable<T>>() {
+            @Override
+            public Observable<T> call( final ApplicationScope applicationScope ) {
+                return getAllEntities( applicationScope );
+            }
+        } );
+
+    }
+
+
+    private Observable<T> getAllEntities(final ApplicationScope applicationScope) {
+        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+        final Id applicationId = applicationScope.getApplication();
+
+        //load all nodes that are targets of our application node.  I.E.
+        // entities that have been saved
+        final Observable<Id> entityNodes =
+            targetIdObservable.getTargetNodes(gm, applicationId);
+
+        //emit Scope + ID
+
+        //create our application node to emit since it's an entity as well
+        final Observable<Id> applicationNode = Observable.just(applicationId);
+
+        //merge both the specified application node and the entity node
+        // so they all get used
+        return Observable
+            .merge( applicationNode, entityNodes ).
+            map( new Func1<Id, T>() {
+                @Override
+                public T call( final Id id ) {
+                   return generateData(applicationScope, id);
+                }
+            } );
+    }
+
+
+    /**
+     * Generate the data for the observable stream from the scope and the node id
+     * @param applicationScope
+     * @param nodeId
+     * @return
+     */
+    protected abstract T generateData(final ApplicationScope applicationScope, final Id nodeId);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
new file mode 100644
index 0000000..0fc5452
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
+
+
+/**
+ * An observable that will emit all application stored in the system.
+ */
+public class AllApplicationsObservableImpl implements AllApplicationsObservable {
+
+    private static final Logger logger = LoggerFactory.getLogger( AllApplicationsObservableImpl.class );
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+
+    @Inject
+    public AllApplicationsObservableImpl( EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                          GraphManagerFactory graphManagerFactory ){
+
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+
+    @Override
+    public Observable<ApplicationScope> getAllApplications() {
+
+        //emit our 3 hard coded applications that are used the manage the system first.
+        //this way consumers can perform whatever work they need to on the root system first
+        final Observable<ApplicationScope> systemIds = Observable.from( Arrays
+            .asList( getApplicationScope( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+                getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+                getApplicationScope( CpNamingUtils.SYSTEM_APP_ID ) ) );
+
+
+        final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
+
+        final CollectionScope appInfoCollectionScope =
+                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
+
+        final EntityCollectionManager collectionManager =
+                entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
+
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
+
+
+        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
+
+        Id rootAppId = appScope.getApplication();
+
+
+        //we have app infos.  For each of these app infos, we have to load the application itself
+        Observable<ApplicationScope> appIds = gm.loadEdgesFromSource(
+                new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        null ) ).flatMap( new Func1<Edge, Observable<ApplicationScope>>() {
+            @Override
+            public Observable<ApplicationScope> call( final Edge edge ) {
+                //get the app info and load it
+                final Id appInfo = edge.getTargetNode();
+
+                return collectionManager.load( appInfo )
+                    //filter out null entities
+                    .filter( new Func1<Entity, Boolean>() {
+                        @Override
+                        public Boolean call( final Entity entity ) {
+                            if ( entity == null ) {
+                                logger.warn( "Encountered a null application info for id {}", appInfo );
+                                return false;
+                            }
+
+                            return true;
+                        }
+                    } )
+                        //get the id from the entity
+                    .map( new Func1<Entity, ApplicationScope>() {
+
+
+                        @Override
+                        public ApplicationScope call( final Entity entity ) {
+
+                            final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+                            return getApplicationScope( uuid );
+                        }
+                    } );
+            }
+        } );
+
+        return Observable.merge( systemIds, appIds );
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
new file mode 100644
index 0000000..758a8aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+@Singleton
+public class AllEntitiesInSystemImpl extends AbstractGraphVisitorImpl<EntityIdScope> {
+
+
+    @Inject
+    public AllEntitiesInSystemImpl( final AllApplicationsObservable applicationObservable,
+                                    final GraphManagerFactory graphManagerFactory,
+                                    final TargetIdObservable targetIdObservable ) {
+        super( applicationObservable, graphManagerFactory, targetIdObservable );
+    }
+
+
+    @Override
+    protected EntityIdScope generateData( final ApplicationScope applicationScope, final Id nodeId ) {
+        CollectionScope scope =
+                               CpNamingUtils.getCollectionScopeNameFromEntityType( applicationScope.getApplication(), nodeId.getType() );
+
+                           final EntityIdScope idScope = new EntityIdScope( scope, nodeId );
+
+                           return idScope;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
deleted file mode 100644
index 55667cb..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * An observable that will emit every entity Id stored in our entire system across all apps.
- * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
- * source node
- */
-public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable<CollectionScope> {
-
-    private final ApplicationObservable applicationObservable;
-    private final GraphManagerFactory graphManagerFactory;
-    private final TargetIdObservable targetIdObservable;
-
-    @Inject
-    public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable) {
-
-        this.applicationObservable = applicationObservable;
-        this.graphManagerFactory = graphManagerFactory;
-        this.targetIdObservable = targetIdObservable;
-    }
-
-    public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
-        return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
-            @Override
-            public ApplicationScope call(Id id) {
-                return new ApplicationScopeImpl(id);
-            }
-        }), bufferSize);
-
-    }
-
-    public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<ApplicationScope> appIdsObservable, final int bufferSize) {
-        //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
-        return appIdsObservable
-            .flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup<CollectionScope>>>() {
-                @Override
-                public Observable<ApplicationEntityGroup<CollectionScope>> call(final ApplicationScope applicationScope) {
-                    return getAllEntities(applicationScope, bufferSize);
-                }
-            });
-    }
-
-    private Observable<ApplicationEntityGroup<CollectionScope>> getAllEntities(final ApplicationScope applicationScope, final int bufferSize) {
-        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
-        final Id applicationId = applicationScope.getApplication();
-
-        //load all nodes that are targets of our application node.  I.E.
-        // entities that have been saved
-        final Observable<Id> entityNodes =
-            targetIdObservable.getTargetNodes(gm, applicationId);
-
-
-        //get scope here
-
-
-        //emit Scope + ID
-
-        //create our application node to emit since it's an entity as well
-        final Observable<Id> applicationNode = Observable.just(applicationId);
-
-        //merge both the specified application node and the entity node
-        // so they all get used
-        return Observable
-            .merge(applicationNode, entityNodes)
-            .buffer(bufferSize)
-            .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
-                @Override
-                public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
-                    List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
-                    for (Id id : ids) {
-                        CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
-                        EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
-                        scopes.add(idScope);
-                    }
-                    return scopes;
-                }
-            })
-            .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
-                @Override
-                public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
-                    return new ApplicationEntityGroup<>(applicationScope, scopes);
-                }
-            });
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
new file mode 100644
index 0000000..19292b8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * An observable that will emit every entity's node Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+@Singleton
+public class AllNodesInGraphImpl extends AbstractGraphVisitorImpl<GraphNode> {
+
+
+    @Inject
+    public AllNodesInGraphImpl( final AllApplicationsObservable applicationObservable,
+                                final GraphManagerFactory graphManagerFactory,
+                                final TargetIdObservable targetIdObservable ) {
+        super( applicationObservable, graphManagerFactory, targetIdObservable );
+    }
+
+
+    @Override
+    protected GraphNode generateData( final ApplicationScope applicationScope, final Id nodeId ) {
+        return new GraphNode( applicationScope, nodeId );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
deleted file mode 100644
index f4a1057..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.util.Arrays;
-import java.util.UUID;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
-
-/**
- * An observable that will emit all application stored in the system.
- */
-public class ApplicationObservableImpl implements ApplicationObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( ApplicationObservableImpl.class );
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final GraphManagerFactory graphManagerFactory;
-
-    @Inject
-    public ApplicationObservableImpl(EntityCollectionManagerFactory entityCollectionManagerFactory, GraphManagerFactory graphManagerFactory){
-
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.graphManagerFactory = graphManagerFactory;
-    }
-
-
-    @Override
-    public Observable<Id> getAllApplicationIds() {
-
-        //emit our 3 hard coded applications that are used the manage the system first.
-        //this way consumers can perform whatever work they need to on the root system first
-        final Observable<Id> systemIds = Observable.from( Arrays
-                .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
-                        generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
-                        generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
-        final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
-
-        final CollectionScope appInfoCollectionScope =
-                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
-
-        final EntityCollectionManager collectionManager =
-                entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
-
-
-        final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
-
-
-        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
-
-        Id rootAppId = appScope.getApplication();
-
-
-        //we have app infos.  For each of these app infos, we have to load the application itself
-        Observable<Id> appIds = gm.loadEdgesFromSource(
-                new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
-            @Override
-            public Observable<Id> call( final Edge edge ) {
-                //get the app info and load it
-                final Id appInfo = edge.getTargetNode();
-
-                return collectionManager.load( appInfo )
-                        //filter out null entities
-                        .filter( new Func1<Entity, Boolean>() {
-                            @Override
-                            public Boolean call( final Entity entity ) {
-                                if ( entity == null ) {
-                                    logger.warn( "Encountered a null application info for id {}", appInfo );
-                                    return false;
-                                }
-
-                                return true;
-                            }
-                        } )
-                                //get the id from the entity
-                        .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
-
-
-                            @Override
-                            public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-
-                                final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
-
-                                return CpNamingUtils.generateApplicationId( uuid );
-                            }
-                        } );
-            }
-        } );
-
-        return Observable.merge( systemIds, appIds );
-    }
-
-    @Override
-    public Observable<ApplicationScope> getAllApplicationScopes() {
-        return getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
-            @Override
-            public ApplicationScope call(Id id) {
-                ApplicationScope scope = new ApplicationScopeImpl(id);
-                return scope;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
deleted file mode 100644
index 4d87f8b..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntityDataMigrationImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
-import org.apache.usergrid.persistence.core.migration.data.*;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.EntityWriteHelper;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Injector;
-import com.google.inject.Key;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import rx.functions.Action1;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
-@NotThreadSafe
-public class EntityDataMigrationIT extends AbstractCoreIT {
-
-
-    private Injector injector;
-
-
-    private CollectionDataMigration entityDataMigration;
-    private DataMigrationManager dataMigrationManager;
-    private MigrationInfoSerialization migrationInfoSerialization;
-    private MvccEntitySerializationStrategy v1Strategy;
-    private MvccEntitySerializationStrategy v2Strategy;
-    private EntityManagerFactory emf;
-
-
-
-    /**
-     * Rule to do the resets we need
-     */
-    @Rule
-    public MigrationTestRule migrationTestRule =
-
-            new MigrationTestRule( app, SpringResource.getInstance().getBean( Injector.class ) ,MvccEntityDataMigrationImpl.class  );
-    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
-    private ApplicationObservable applicationObservable;
-
-
-    @Before
-    public void setup() {
-        emf = setup.getEmf();
-        injector = SpringResource.getInstance().getBean(Injector.class);
-        entityDataMigration = injector.getInstance( MvccEntityDataMigrationImpl.class );
-        injector =  SpringResource.getInstance().getBean( Injector.class );
-        dataMigrationManager = injector.getInstance( DataMigrationManager.class );
-        migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-        MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
-        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
-        applicationObservable = injector.getInstance(ApplicationObservable.class);
-
-        v1Strategy = strategy.getMigration().from();
-        v2Strategy = strategy.getMigration().to();
-    }
-
-
-    @Test
-    @Ignore("Awaiting fix for USERGRID-268")
-    public void testDataMigration() throws Throwable {
-
-        assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
-        assertEquals( "Previous version expected", 2, dataMigrationManager.getCurrentVersion());
-
-
-        final EntityManager newAppEm = app.getEntityManager();
-
-        final String type1 = "type1thing";
-        final String type2 = "type2thing";
-        final int size = 10;
-
-        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
-        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
-
-
-        final Set<Id> createdEntityIds = new HashSet<>();
-        createdEntityIds.addAll( type1Identities );
-        createdEntityIds.addAll( type2Identities );
-
-
-        final TestProgressObserver progressObserver = new TestProgressObserver();
-
-
-        //load everything that appears in v1, migrate and ensure it appears in v2
-        final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
-        //set that holds all entityIds for later assertion
-        final Set<Id> entityIds = new HashSet<>(10000);
-
-
-        //read everything in previous version format and put it into our types.  Assumes we're
-        //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
-
-        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
-            .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
-                @Override
-                public void call(
-                        final ApplicationEntityGroup<CollectionScope> entity ) {
-
-                    //add all versions from history to our comparison
-                    for ( final EntityIdScope id : entity.entityIds ) {
-
-                        CollectionScope scope = CpNamingUtils
-                                .getCollectionScopeNameFromEntityType(
-                                    entity.applicationScope.getApplication(),
-                                    id.getId().getType());
-
-                        final Iterator<MvccEntity> versions = v1Strategy
-                                .loadDescendingHistory( scope, id.getId(), UUIDGenerator.newTimeUUID(),
-                                        100 );
-
-                        while ( versions.hasNext() ) {
-
-                            final MvccEntity mvccEntity = versions.next();
-
-                            savedEntities.add( mvccEntity );
-
-                            createdEntityIds.remove( mvccEntity.getId() );
-
-                            entityIds.add( id.getId() );
-                        }
-                    }
-                }
-            } ).toBlocking().lastOrDefault( null );
-
-        assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
-        assertTrue( "Saved new entities", savedEntities.size() > 0 );
-
-        //perform the migration
-        rx.Observable<ApplicationEntityGroup> oRx = allEntitiesInSystemObservable.getAllEntitiesInSystem(applicationObservable.getAllApplicationScopes(), 1000);
-
-        entityDataMigration.migrate(oRx, progressObserver).toBlocking().last();
-
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
-        //write the status and version, then invalidate the cache so it appears
-        migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
-        migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
-        dataMigrationManager.invalidate();
-
-        assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
-                dataMigrationManager.getCurrentVersion() );
-
-
-        //now visit all entities in the system again, load them from v2, and ensure they're the same
-        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
-            .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
-                @Override
-                public void call(
-                        final ApplicationEntityGroup<CollectionScope> entity ) {
-                    //add all versions from history to our comparison
-                    for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {
-
-                        CollectionScope scope = CpNamingUtils
-                                .getCollectionScopeNameFromEntityType(
-                                    entity.applicationScope.getApplication(),
-                                    id.getId().getType());
-
-                        final Iterator<MvccEntity> versions = v2Strategy
-                                .loadDescendingHistory( scope, id.getId(),
-                                        UUIDGenerator.newTimeUUID(), 100 );
-
-                        while ( versions.hasNext() ) {
-
-                            final MvccEntity mvccEntity = versions.next();
-
-                            savedEntities.remove( mvccEntity );
-                        }
-                    }
-                }
-            }
-
-
-            ).toBlocking().lastOrDefault( null );
-
-
-        assertEquals( "All entities migrated", 0, savedEntities.size() );
-
-
-        //now visit all entities in the system again, and load them from the EM,
-        // ensure we see everything we did in the v1 traversal
-        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
-            .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
-                @Override
-                public void call(
-                        final ApplicationEntityGroup<CollectionScope> entity ) {
-
-                    final EntityManager em = emf.getEntityManager(
-                            entity.applicationScope.getApplication().getUuid() );
-
-                    //add all versions from history to our comparison
-                    for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {
-
-
-                        try {
-                            final Entity emEntity = em.get( SimpleEntityRef.fromId( id.getId() ) );
-
-                            if(emEntity != null){
-                                entityIds.remove( id.getId() );
-                            }
-                        }
-                        catch ( Exception e ) {
-                            throw new RuntimeException("Error loading entity", e);
-                        }
-                    }
-                }
-            }
-
-
-            ).toBlocking().lastOrDefault( null );
-
-
-        assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index 421b875..99fb139 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;