You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/03 02:43:15 UTC

[1/2] incubator-usergrid git commit: Migrations work. Need to finish tests.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405 e96af464b -> fa69be86c


Migrations work.  Need to finish tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a55c784d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a55c784d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a55c784d

Branch: refs/heads/USERGRID-405
Commit: a55c784dac5beb624950f6b09746667e4431fbd8
Parents: e96af46
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 2 16:11:11 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 2 17:19:27 2015 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |  12 +-
 .../impl/CollectionDataVersions.java            |   4 +-
 .../MvccEntitySerializationStrategyV1Impl.java  |   2 +-
 .../MvccEntitySerializationStrategyV2Impl.java  |   2 +-
 .../MvccEntitySerializationStrategyV3Impl.java  |   2 +-
 .../serialization/impl/SerializationModule.java |  78 ++--
 .../migration/CollectionMigrationPlugin.java    |  78 +++-
 .../migration/MvccEntityDataMigrationImpl.java  | 446 ++++++++++++++-----
 .../collection/guice/TestCollectionModule.java  |  17 +-
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../data/MigrationInfoSerializationImpl.java    |   6 +-
 .../migration/data/newimpls/DataMigration2.java |  17 +-
 .../data/newimpls/MigrationPlugin.java          |   6 +-
 .../newimpls/TestMigrationDataProvider.java     |  61 +++
 14 files changed, 563 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index af5c4cf..7d78177 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -55,7 +55,7 @@ import com.google.inject.multibindings.Multibinder;
  *
  * @author tnine
  */
-public class CollectionModule extends AbstractModule {
+public abstract class CollectionModule extends AbstractModule {
 
 
     @Override
@@ -86,6 +86,8 @@ public class CollectionModule extends AbstractModule {
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
 
+        configureMigrationProvider();
+
     }
 
     @Provides
@@ -120,6 +122,14 @@ public class CollectionModule extends AbstractModule {
     }
 
 
+    /**
+     * Gives callers the ability to to configure an instance of
+     *
+     * MigrationDataProvider<EntityIdScope> for providing data migrations
+     */
+    public abstract void configureMigrationProvider();
+
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
index ee84b1e..6e8be45 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
@@ -28,9 +28,9 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
  * Versions of data as they exist across our system
  */
 public enum CollectionDataVersions{
+    ZERO(0),
     ONE(1),
-    TWO(2),
-    THREE(3);
+    TWO(2);
 
     private final int version;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index a959508..1dab673 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -90,7 +90,7 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.ONE.getVersion();
+        return CollectionDataVersions.ZERO.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 04a1fea..1f65fcb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -93,7 +93,7 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.TWO.getVersion();
+        return CollectionDataVersions.ONE.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 4e73119..7b8aac1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -383,7 +383,7 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.THREE.getVersion();
+        return CollectionDataVersions.TWO.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index fe804f3..f32f4f9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -34,7 +34,7 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Key;
-import com.google.inject.Provides;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
@@ -54,26 +54,32 @@ public class SerializationModule extends AbstractModule {
         //We've migrated this one, so we need to set up the previous, current, and proxy
 
 
-        bind(MvccEntitySerializationStrategy.class).annotatedWith( ProxyImpl.class ).to( MvccEntitySerializationStrategyProxyImpl.class );
-
-
+        bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+                                                     .to( MvccEntitySerializationStrategyProxyImpl.class );
 
 
         //bind all 3 implementations
-        bind(MvccEntitySerializationStrategyV1Impl.class);
-        bind(MvccEntitySerializationStrategyV2Impl.class);
-        bind(MvccEntitySerializationStrategyV3Impl.class);
+        bind( MvccEntitySerializationStrategyV1Impl.class );
+        bind( MvccEntitySerializationStrategyV2Impl.class );
+        bind( MvccEntitySerializationStrategyV3Impl.class );
+
 
+        bind( new TypeLiteral<VersionedMigrationSet<MvccEntitySerializationStrategy>>() {} )
+                .toProvider( MvccEntitySerializationStrategyProvider.class );
 
 
         //migrations
         //we want to make sure our generics are retained, so we use a typeliteral
-        Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =  Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>(){});
+        Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>() {} );
+
+
         dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
 
 
         //wire up the collection migration plugin
-        Multibinder.newSetBinder( binder(), MigrationPlugin.class).addBinding().to( CollectionMigrationPlugin.class );
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to( CollectionMigrationPlugin.class );
+
 
 
 
@@ -90,39 +96,53 @@ public class SerializationModule extends AbstractModule {
 
 
         //bind our settings as an eager singleton so it's checked on startup
-        bind(SettingsValidation.class).asEagerSingleton();
+        bind( SettingsValidation.class ).asEagerSingleton();
     }
 
 
-    /**
-     * Configure via explicit declaration the migration path we can follow
-     * @param v1
-     * @param v2
-     * @param v3
-     * @return
-     */
     @Singleton
-    @Inject
-    @Provides
-    public VersionedMigrationSet<MvccEntitySerializationStrategy> getVersions(final MvccEntitySerializationStrategyV1Impl v1, final MvccEntitySerializationStrategyV2Impl v2, final MvccEntitySerializationStrategyV3Impl v3){
+    public static final class MvccEntitySerializationStrategyProvider
+            implements Provider<VersionedMigrationSet<MvccEntitySerializationStrategy>> {
+
+
+        private final MvccEntitySerializationStrategyV1Impl v1;
+        private final MvccEntitySerializationStrategyV2Impl v2;
+        private final MvccEntitySerializationStrategyV3Impl v3;
+
+
+        @Inject
+        public MvccEntitySerializationStrategyProvider( final MvccEntitySerializationStrategyV1Impl v1,
+                                                         final MvccEntitySerializationStrategyV2Impl v2,
+                                                         final MvccEntitySerializationStrategyV3Impl v3 ) {
+            this.v1 = v1;
+            this.v2 = v2;
+            this.v3 = v3;
+        }
+
 
+        @Override
+        public VersionedMigrationSet<MvccEntitySerializationStrategy> get() {
 
-        //we must perform a migration from v1 to v3 in order to maintain consistency
-        MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
+            //we must perform a migration from v1 to v3 in order to maintain consistency
+            MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
 
-        //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
+            //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
 
-        MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
+            MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
 
 
-        //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a v3->v5 and a v4->v5 set
-        MigrationRelationship<MvccEntitySerializationStrategy> current = new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
+            //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a
+            // v3->v5 and a v4->v5 set
+            MigrationRelationship<MvccEntitySerializationStrategy> current =
+                    new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
 
 
-        //now create our set of versions
-        VersionedMigrationSet<MvccEntitySerializationStrategy> set = new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
+            //now create our set of versions
+            VersionedMigrationSet<MvccEntitySerializationStrategy> set =
+                    new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
 
-        return set;
 
+            return set;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
index 5d52a9f..d0663c2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
@@ -26,6 +26,11 @@ package org.apache.usergrid.persistence.collection.serialization.impl.migration;
 
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
@@ -42,17 +47,22 @@ import com.google.inject.Singleton;
 public class CollectionMigrationPlugin implements MigrationPlugin {
 
 
-    public static final String PLUGIN_NAME =  "collections-entity-data";
+    private static final Logger LOG = LoggerFactory.getLogger( CollectionMigrationPlugin.class );
+
+    public static final String PLUGIN_NAME = "collections-entity-data";
 
-    private final DataMigration2<EntityIdScope> entityDataMigration;
+    private final Set<DataMigration2<EntityIdScope>> entityDataMigrations;
     private final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider;
+    private final MigrationInfoSerialization migrationInfoSerialization;
 
 
     @Inject
-    public CollectionMigrationPlugin( final DataMigration2<EntityIdScope> entityDataMigration,
-                                      final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider ) {
-        this.entityDataMigration = entityDataMigration;
+    public CollectionMigrationPlugin( final Set<DataMigration2<EntityIdScope>> entityDataMigrations,
+                                      final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider,
+                                      final MigrationInfoSerialization migrationInfoSerialization ) {
+        this.entityDataMigrations = entityDataMigrations;
         this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
+        this.migrationInfoSerialization = migrationInfoSerialization;
     }
 
 
@@ -64,12 +74,66 @@ public class CollectionMigrationPlugin implements MigrationPlugin {
 
     @Override
     public void run( final ProgressObserver observer ) {
-       entityDataMigration.migrate( entityIdScopeDataMigrationProvider, observer );
+
+        //run until complete
+        while(runMigration( observer )){
+         LOG.info( "Migration complete, checking for next run" );
+        }
+
     }
 
 
     @Override
     public int getMaxVersion() {
-        return 0;
+
+        int max = 0;
+
+        for(DataMigration2<EntityIdScope> entityMigration: entityDataMigrations){
+            max = Math.max( max, entityMigration.getMaxVersion() );
+        }
+
+        return max;
+    }
+
+
+    /**
+     * Try to run the migration
+     *
+     * @return True if we ran a migration
+     */
+    private boolean runMigration( final ProgressObserver po ) {
+        DataMigration2<EntityIdScope> migrationToExecute = null;
+
+
+        final int version = migrationInfoSerialization.getVersion( PLUGIN_NAME );
+
+        for ( DataMigration2<EntityIdScope> entityMigration : entityDataMigrations ) {
+            if ( entityMigration.supports( version ) ) {
+                if ( migrationToExecute != null ) {
+                    throw new DataMigrationException(
+                            "Two migrations attempted to migration the same version, this is not allowed.  Class '"
+                                    + migrationToExecute.getClass().getName() + "' and class '" + entityMigration
+                                    .getClass().getName()
+                                    + "' both support this version. This means something is wired incorrectly" );
+                }
+
+                migrationToExecute = entityMigration;
+            }
+        }
+
+        if(migrationToExecute == null){
+            LOG.info( "No migrations found to execute" );
+            return false;
+        }
+
+        //run the migration
+        final int newSystemVersion = migrationToExecute.migrate( version, entityIdScopeDataMigrationProvider, po );
+
+        migrationInfoSerialization.setVersion( PLUGIN_NAME, newSystemVersion );
+
+        //signal we've run a migration and return
+        return true;
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 5250b5c..48e0195 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -26,10 +26,14 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
@@ -45,51 +49,68 @@ import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.functions.Func2;
+import rx.observables.GroupedObservable;
 import rx.schedulers.Schedulers;
 
 
 /**
  * Data migration strategy for entities
  */
+@Singleton
 public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope> {
 
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MigrationInfoSerialization migrationInfoSerialization;
+    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
 
 
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                                        final MigrationInfoSerialization migrationInfoSerialization ) {
+                                        final MigrationInfoSerialization migrationInfoSerialization,
+                                        final EntityVersionCleanupFactory entityVersionCleanupFactory,
+                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3 ) {
 
         this.keyspace = keyspace;
         this.allVersions = allVersions;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.migrationInfoSerialization = migrationInfoSerialization;
+        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
     }
 
 
     @Override
-    public int getVersion() {
-        //get the max implementation version, since that's what we're going to
-        return allVersions.getMaxVersion( migrationInfoSerialization.getVersion( CollectionMigrationPlugin.PLUGIN_NAME ) );
+    public boolean supports( final int currentVersion ) {
+        //we can only migrate up to v3 with this implementation.  Beyond that, we should use a different migration
+        return currentVersion < mvccEntitySerializationStrategyV3.getImplementationVersion();
     }
 
 
     @Override
-    public void migrate( final MigrationDataProvider<EntityIdScope> migrationDataProvider,   final ProgressObserver observer ) {
+    public int getMaxVersion() {
+        return mvccEntitySerializationStrategyV3.getImplementationVersion();
+    }
+
+
+    @Override
+    public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+                         final ProgressObserver observer ) {
 
         final AtomicLong atomicLong = new AtomicLong();
 
@@ -97,137 +118,326 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
 
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
-        final MigrationRelationship<MvccEntitySerializationStrategy>
-                migration = allVersions.getMigrationRelationship( getCurrentSystemVersion() );
-
-        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
-                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
-
-
-                       //process the ids in parallel
-                       @Override
-                       public Observable<Long> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
-
-
-                           return entityIdScopeObservable
-                                   .flatMap( new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
-
-
-                                       @Override
-                                       public Observable<EntityToSaveMessage> call(
-                                               final EntityIdScope entityIdScope ) {
-
-                                           //load the entity
-                                           final CollectionScope currentScope = entityIdScope.getCollectionScope();
-
-
-
-                                           //for each element in our history, we need to copy it to v2.  Note that this migration
-                                           //won't support anything beyond V2
-
-                                           final Iterator<MvccEntity> allVersions = migration.from
-                                                   .loadAscendingHistory( currentScope, entityIdScope.getId(),
-                                                           startTime, 100 );
-
-                                           //emit all the entities
-                                           return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
-                                               @Override
-                                               public void call(
-                                                       final Subscriber<? super EntityToSaveMessage> subscriber ) {
-
-                                                   while ( allVersions.hasNext() ) {
-                                                       final EntityToSaveMessage message =
-                                                               new EntityToSaveMessage( currentScope,
-                                                                       allVersions.next() );
-                                                       subscriber.onNext( message );
-                                                   }
-
-                                                   subscriber.onCompleted();
-                                               }
-                                           } );
-                                       }
-                                   } ).buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
-                                       @Override
-                                       public void call( final List<EntityToSaveMessage> messages ) {
-                                           atomicLong.addAndGet( messages.size() );
-
-                                           final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+        final MigrationRelationship<MvccEntitySerializationStrategy> migration =
+                allVersions.getMigrationRelationship( currentVersion );
 
 
-                                           for ( EntityToSaveMessage message : messages ) {
-
-                                               final MutationBatch entityRewrite =
-                                                       migration.to.write( message.scope, message.entity );
-
-                                               //add to the total batch
-                                               totalBatch.mergeShallow( entityRewrite );
-
-                                               //write the unique values
-
-                                               if ( message.entity.getEntity().isPresent() ) {
-
-                                                   final Entity entity = message.entity.getEntity().get();
-
-                                                   final Id entityId = entity.getId();
-
-                                                   final UUID version = message.entity.getVersion();
-
-                                                   // re-write the unique values but this time with no TTL
-                                                   for ( Field field : EntityUtils
-                                                           .getUniqueFields( message.entity.getEntity().get() ) ) {
-
-                                                       UniqueValue written =
-                                                               new UniqueValueImpl( field, entityId, version );
-
-                                                       MutationBatch mb =
-                                                               uniqueValueSerializationStrategy.write( message.scope, written );
-
-
-                                                       // merge into our existing mutation batch
-                                                       totalBatch.mergeShallow( mb );
-                                                   }
-
-
-                                               }
-                                           }
-
-
-                                           executeBatch( totalBatch, observer, atomicLong );
-                                       }
-                                   } )
-                                           //count the results
-                                   .reduce( 0l, new Func2<Long, List<EntityToSaveMessage>, Long>() {
-                                       @Override
-                                       public Long call( final Long aLong, final List<EntityToSaveMessage> ids ) {
-                                           return aLong + ids.size();
-                                       }
-                                   } );
-                       }
-                   } ).toBlocking().last();
+        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
+                                                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
+
+
+                  //process the ids in parallel
+                  @Override
+                  public Observable<Long> call(
+                          final Observable<EntityIdScope>
+                                  entityIdScopeObservable ) {
+
+
+                      return entityIdScopeObservable.flatMap(
+                              new Func1<EntityIdScope,
+                                      Observable<EntityToSaveMessage>>() {
+
+
+                                  @Override
+                                  public
+                                  Observable<EntityToSaveMessage> call(
+                                          final EntityIdScope
+                                                  entityIdScope ) {
+
+                                      //load the entity
+                                      final CollectionScope
+                                              currentScope =
+                                              entityIdScope
+                                                      .getCollectionScope();
+
+
+                                      //for each element in our
+                                      // history, we need to copy it
+                                      // to v2.
+                                      // Note that
+                                      // this migration
+                                      //won't support anything beyond V2
+
+                                      final Iterator<MvccEntity>
+                                              allVersions =
+                                              migration.from
+                                                      .loadAscendingHistory(
+                                                              currentScope,
+                                                              entityIdScope
+                                                                      .getId(),
+                                                              startTime,
+                                                              100 );
+
+                                      //emit all the entity versions
+                                      return Observable.create(
+                                              new Observable
+                                                      .OnSubscribe<EntityToSaveMessage>() {
+                                                  @Override
+                                                  public void call(
+                                                          final
+                                                          Subscriber<? super
+                                                                  EntityToSaveMessage> subscriber ) {
+
+                                                      while ( allVersions
+                                                              .hasNext() ) {
+                                                          final
+                                                          EntityToSaveMessage
+                                                                  message =
+                                                                  new EntityToSaveMessage(
+                                                                          currentScope,
+                                                                          allVersions
+                                                                                  .next() );
+                                                          subscriber.onNext( message );
+                                                      }
+
+                                                      subscriber.onCompleted();
+                                                  }
+                                              } );
+                                  }
+                              } )
+
+
+                              //group them by entity id so we can get
+                              // the max for cleanup
+                              .groupBy(
+                                      new Func1<EntityToSaveMessage,
+                                              Id>() {
+                                          @Override
+                                          public Id call(
+                                                  final
+                                                  EntityToSaveMessage
+                                                          entityToSaveMessage ) {
+                                              return entityToSaveMessage.entity
+                                                      .getId();
+                                          }
+                                      } )
+                              //buffer up 10 of groups so we can put them all in a single mutation
+                              .buffer( 10 ).doOnNext(
+                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
+
+
+                                          @Override
+                                          public void call(
+                                                  final
+                                                  List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                              atomicLong.addAndGet(
+                                                      groupedObservables
+                                                              .size() );
+
+                                              final MutationBatch
+                                                      totalBatch =
+                                                      keyspace.prepareMutationBatch();
+
+
+                                              //run each of the
+                                              // groups and add
+                                              // it ot the batch
+                                              Observable
+                                                      .from( groupedObservables )
+                                                      //emit the group as an observable
+                                                      .flatMap(
+                                                              new Func1<GroupedObservable<Id, EntityToSaveMessage>, Observable<EntityToSaveMessage>>() {
+
+
+                                                                  @Override
+                                                                  public Observable<EntityToSaveMessage> call(
+                                                                          final GroupedObservable<Id, EntityToSaveMessage> idEntityToSaveMessageGroupedObservable ) {
+                                                                      return idEntityToSaveMessageGroupedObservable
+                                                                              .asObservable();
+                                                                  }
+                                                              } )
+
+                                                      //merge and add the batch
+                                                      .doOnNext(
+                                                              new Action1<EntityToSaveMessage>() {
+                                                                  @Override
+                                                                  public void call(
+                                                                          final EntityToSaveMessage message ) {
+
+                                                                      final MutationBatch
+                                                                              entityRewrite =
+                                                                              migration.to.write( message.scope,
+                                                                                              message.entity );
+
+                                                                      //add to
+                                                                      // the
+                                                                      // total
+                                                                      // batch
+                                                                      totalBatch.mergeShallow( entityRewrite );
+
+                                                                      //write
+                                                                      // the
+                                                                      // unique values
+
+                                                                      if ( !message.entity
+                                                                              .getEntity()
+                                                                              .isPresent() ) {
+                                                                          return;
+                                                                      }
+
+                                                                          final Entity
+                                                                                  entity =
+                                                                                  message.entity
+                                                                                          .getEntity()
+                                                                                          .get();
+
+                                                                          final Id
+                                                                                  entityId =
+                                                                                  entity.getId();
+
+                                                                          final UUID
+                                                                                  version =
+                                                                                  message.entity
+                                                                                          .getVersion();
+
+                                                                          // re-write the unique values
+                                                                          // but this
+                                                                          // time with
+                                                                          // no TTL so that cleanup can clean up older values
+                                                                          for ( Field field : EntityUtils
+                                                                                  .getUniqueFields(
+                                                                                          message.entity
+                                                                                                  .getEntity()
+                                                                                                  .get() ) ) {
+
+                                                                              UniqueValue
+                                                                                      written =
+                                                                                      new UniqueValueImpl(
+                                                                                              field,
+                                                                                              entityId,
+                                                                                              version );
+
+                                                                              MutationBatch
+                                                                                      mb =
+                                                                                      uniqueValueSerializationStrategy
+                                                                                              .write( message.scope,
+                                                                                                      written );
+
+
+                                                                              // merge into our
+                                                                              // existing mutation
+                                                                              // batch
+                                                                              totalBatch
+                                                                                      .mergeShallow(
+                                                                                              mb );
+                                                                          }
+                                                                  }
+                                                              } )
+                                                      //once we've streamed everything, flush it
+                                                      .doOnCompleted(
+
+                                                              new Action0() {
+                                                                  @Override
+                                                                  public void call() {
+
+                                                                      executeBatch(migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+                                                                  }
+                                                              } )
+                                                      .toBlocking()
+                                                      .last();
+                                          }
+                                      } ).doOnNext(
+                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
+                                          @Override
+                                          public void call(
+                                                  final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                              for ( final GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
+
+                                                  //get the highest
+                                                  // entity and run a
+                                                  // cleanup task on it
+                                                  final EntityToSaveMessage
+                                                          maxEntity =
+                                                          group.toBlocking()
+                                                               .last();
+
+                                                  final EntityVersionCleanupTask
+                                                          task =
+                                                          entityVersionCleanupFactory
+                                                                  .getTask(
+                                                                          maxEntity.scope,
+                                                                          maxEntity.entity
+                                                                                  .getId(),
+                                                                          maxEntity.entity
+                                                                                  .getVersion() );
+
+                                                  /**
+                                                   * just run the
+                                                   * call in this
+                                                   * process, we're
+                                                   * already
+                                                   * doing parallel
+                                                   * this forces a
+                                                   * repair of the
+                                                   * unique properties,
+                                                   and will bring us
+                                                   to a consistent
+                                                   state after the
+                                                   */
+
+                                                  try {
+                                                      task.call();
+                                                  }
+                                                  catch ( Exception e ) {
+                                                      throw new RuntimeException(
+                                                              "Unable to run cleanup task",
+                                                              e );
+                                                  }
+                                              }
+                                          }
+                                      } ).
+                                      reduce( 0l,
+                                              new Func2<Long, List<GroupedObservable<Id, EntityToSaveMessage>>, Long>() {
+
+                                                  @Override
+                                                  public Long call(
+                                                          final Long aLong,
+                                                          final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                                      long newCount =
+                                                              aLong;
+
+                                                      for ( GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
+                                                          newCount +=
+                                                                  group.longCount()
+                                                                       .toBlocking()
+                                                                       .last();
+                                                      }
+
+                                                      return newCount;
+                                                  }
+                                              }
+
+
+                                            );
+                  }}).toBlocking().last();
 
         //now we update the progress observer
 
-        observer.update( getVersion(), "Finished for this step.  Migrated " + migrated + "entities total. ");
+        observer.update( migration.to.getImplementationVersion(), "Finished for this step.  Migrated " + migrated + "entities total. " );
+
+        return migration.to.getImplementationVersion();
     }
 
 
-    protected void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+    protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
         try {
             batch.execute();
 
-            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+            po.update(targetVersion, "Finished copying " + count + " entities to the new format" );
         }
         catch ( ConnectionException e ) {
-            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            po.failed( targetVersion, "Failed to execute mutation in cassandra" );
             throw new DataMigrationException( "Unable to migrate batches ", e );
         }
     }
 
-    private int getCurrentSystemVersion(){
-       return migrationInfoSerialization.getVersion( CollectionMigrationPlugin.PLUGIN_NAME );
-    }
 
-    private static final class EntityToSaveMessage{
+
+
+    private static final class EntityToSaveMessage {
         private final CollectionScope scope;
         private final MvccEntity entity;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 7e1f32b..b42ad94 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@ -20,8 +20,15 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
+import java.util.Collections;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.TestMigrationDataProvider;
+
+import com.google.inject.TypeLiteral;
 
 
 public class TestCollectionModule extends TestModule {
@@ -30,7 +37,15 @@ public class TestCollectionModule extends TestModule {
     protected void configure() {
 
         install( new CommonModule() );
-        install( new CollectionModule() );
+        install( new CollectionModule() {
+            @Override
+            public void configureMigrationProvider() {
+                //configure our migration data provider
+
+                TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>();
+                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).toInstance( migrationDataProvider );
+            }
+        } );
 
         /**
          * Test modules

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index 61b6b04..97c0479 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -76,6 +76,8 @@ public class CommonModule extends AbstractModule {
         bind( DataMigrationManager.class ).to( DataMigrationManagerImpl.class );
 
 
+        bind (MigrationInfoCache.class).to( MigrationInfoCacheImpl.class );
+
 
         //do multibindings for migrations
         //create the empty multibinder so other plugins can use it

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index f976568..4a349fd 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -146,7 +146,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
@@ -180,7 +180,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
@@ -196,7 +196,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
index 5830335..303c11c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
@@ -33,15 +33,26 @@ public interface DataMigration2<T> {
 
     /**
      * Perform the migration, returning an observable with a single emitted value
+     * @param currentVersion the current version of the system
      * @param migrationDataProvider
+     * @param observer The observer to receive updates of the progress
+     *
+     * @return The version that the system is now running
      */
-    public void migrate(MigrationDataProvider<T> migrationDataProvider, ProgressObserver observer);
+    public int migrate(final int currentVersion, MigrationDataProvider<T> migrationDataProvider, ProgressObserver observer);
 
     /**
-     * Get the version of this migration. It should be unique within the scope of the plugin
+     * Check if this version supports migration from the current system version.  If this returns false,
+     * migrate will not be invoked
      * @return
      */
-    public int getVersion();
+    public boolean supports(final int currentVersion);
+
+    /**
+     * Get the max version this migration can migrate to
+     * @return
+     */
+    public int getMaxVersion();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
index 880cfd1..50dc91b 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
@@ -42,10 +42,10 @@ public interface MigrationPlugin {
      */
     public void run(ProgressObserver observer);
 
+
     /**
-     * Get the maximum migration version this plugin implements
+     * Get the max version this plugin supports
      * @return
      */
-    public int getMaxVersion();
-
+    int getMaxVersion();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
new file mode 100644
index 0000000..9e99c6f
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
@@ -0,0 +1,61 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+import java.util.Collection;
+
+import rx.Observable;
+
+
+/**
+ * A simple test class that will emit the provided test data when subscribed
+ * @param <T>
+ */
+public class TestMigrationDataProvider<T> implements MigrationDataProvider<T> {
+
+
+
+    //default to nothing so that we don't return null
+    private Observable<T> observable = Observable.empty();
+
+
+    public TestMigrationDataProvider(  ) {}
+
+
+    @Override
+    public Observable<T> getData() {
+       return observable;
+    }
+
+
+    /**
+     * Set this observable to return when invoked
+     *
+     * @param observable
+     */
+    public void setObservable( final Observable<T> observable ) {
+        this.observable = observable;
+    }
+}


[2/2] incubator-usergrid git commit: Updated graph migration

Posted by to...@apache.org.
Updated graph migration


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fa69be86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fa69be86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fa69be86

Branch: refs/heads/USERGRID-405
Commit: fa69be86c48e916a7a368ca3bef6101bc0e81671
Parents: a55c784
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 2 18:43:13 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 2 18:43:13 2015 -0700

----------------------------------------------------------------------
 .../impl/CollectionDataVersions.java            |   6 +-
 ...vccEntitySerializationStrategyProxyImpl.java |   2 +-
 .../MvccEntitySerializationStrategyV1Impl.java  |   3 +-
 .../MvccEntitySerializationStrategyV2Impl.java  |   2 +-
 .../MvccEntitySerializationStrategyV3Impl.java  |   6 +-
 .../serialization/impl/SerializationModule.java |  61 +++----
 .../migration/CollectionMigrationPlugin.java    |  83 +---------
 .../migration/MvccEntityDataMigrationImpl.java  |   3 -
 .../data/newimpls/AbstractMigrationPlugin.java  | 125 +++++++++++++++
 .../persistence/graph/guice/GraphModule.java    |  82 ++++++++--
 .../EdgeMetadataSerialization.java              |   3 +-
 .../serialization/EdgeMigrationStrategy.java    |  33 ----
 .../impl/EdgeDataMigrationImpl.java             |  52 +++---
 .../EdgeMetadataSerializationProxyImpl.java     | 158 +++++++++++--------
 .../impl/EdgeMetadataSerializationV1Impl.java   |   6 +
 .../impl/EdgeMetadataSerializationV2Impl.java   |   6 +
 .../serialization/impl/GraphDataVersions.java   |  43 +++++
 .../impl/migration/GraphMigrationPlugin.java    |  63 ++++++++
 18 files changed, 464 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
index 6e8be45..76d35d7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
@@ -28,9 +28,9 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
  * Versions of data as they exist across our system
  */
 public enum CollectionDataVersions{
-    ZERO(0),
-    ONE(1),
-    TWO(2);
+    INITIAL(0),
+    BUFFER_SHORT_FIX(1),
+    LOG_REMOVAL(2);
 
     private final int version;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
index 02dfd9d..19bfc9e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -187,7 +187,7 @@ public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySeria
 
     @Override
     public int getImplementationVersion() {
-        return 0;
+        throw new UnsupportedOperationException("Not supported in the proxy");
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index 1dab673..6f8525c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
@@ -90,7 +89,7 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.ZERO.getVersion();
+        return CollectionDataVersions.INITIAL.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 1f65fcb..ed97e80 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -93,7 +93,7 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.ONE.getVersion();
+        return CollectionDataVersions.BUFFER_SHORT_FIX.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 7b8aac1..ba58ee2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -14,8 +14,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.UUIDType;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntitySet;
@@ -44,8 +42,6 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.fasterxml.uuid.UUIDComparator;
-import com.fasterxml.uuid.impl.UUIDUtil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -383,7 +379,7 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.TWO.getVersion();
+        return CollectionDataVersions.LOG_REMOVAL.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index f32f4f9..241a274 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -35,6 +35,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.google.inject.Provider;
+import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
@@ -64,10 +65,6 @@ public class SerializationModule extends AbstractModule {
         bind( MvccEntitySerializationStrategyV3Impl.class );
 
 
-        bind( new TypeLiteral<VersionedMigrationSet<MvccEntitySerializationStrategy>>() {} )
-                .toProvider( MvccEntitySerializationStrategyProvider.class );
-
-
         //migrations
         //we want to make sure our generics are retained, so we use a typeliteral
         Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =
@@ -99,50 +96,36 @@ public class SerializationModule extends AbstractModule {
         bind( SettingsValidation.class ).asEagerSingleton();
     }
 
+    /**
+      * Configure via explicit declaration the migration path we can follow
+      * @param v1
+      * @param v2
+      * @param v3
+      * @return
+      */
+     @Singleton
+     @Inject
+     @Provides
+     public VersionedMigrationSet<MvccEntitySerializationStrategy> getVersions(final MvccEntitySerializationStrategyV1Impl v1, final MvccEntitySerializationStrategyV2Impl v2, final MvccEntitySerializationStrategyV3Impl v3){
 
-    @Singleton
-    public static final class MvccEntitySerializationStrategyProvider
-            implements Provider<VersionedMigrationSet<MvccEntitySerializationStrategy>> {
-
-
-        private final MvccEntitySerializationStrategyV1Impl v1;
-        private final MvccEntitySerializationStrategyV2Impl v2;
-        private final MvccEntitySerializationStrategyV3Impl v3;
 
+         //we must perform a migration from v1 to v3 in order to maintain consistency
+         MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
 
-        @Inject
-        public MvccEntitySerializationStrategyProvider( final MvccEntitySerializationStrategyV1Impl v1,
-                                                         final MvccEntitySerializationStrategyV2Impl v2,
-                                                         final MvccEntitySerializationStrategyV3Impl v3 ) {
-            this.v1 = v1;
-            this.v2 = v2;
-            this.v3 = v3;
-        }
+         //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
 
+         MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
 
-        @Override
-        public VersionedMigrationSet<MvccEntitySerializationStrategy> get() {
 
-            //we must perform a migration from v1 to v3 in order to maintain consistency
-            MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
+         //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a v3->v5 and a v4->v5 set
+         MigrationRelationship<MvccEntitySerializationStrategy> current = new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
 
-            //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
 
-            MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
+         //now create our set of versions
+         VersionedMigrationSet<MvccEntitySerializationStrategy> set = new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
 
+         return set;
 
-            //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a
-            // v3->v5 and a v4->v5 set
-            MigrationRelationship<MvccEntitySerializationStrategy> current =
-                    new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
+     }
 
-
-            //now create our set of versions
-            VersionedMigrationSet<MvccEntitySerializationStrategy> set =
-                    new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
-
-
-            return set;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
index d0663c2..0cf25b2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
@@ -26,15 +26,10 @@ package org.apache.usergrid.persistence.collection.serialization.impl.migration;
 
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -44,25 +39,17 @@ import com.google.inject.Singleton;
  * Migration plugin for the collection module
  */
 @Singleton
-public class CollectionMigrationPlugin implements MigrationPlugin {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( CollectionMigrationPlugin.class );
+public class CollectionMigrationPlugin extends AbstractMigrationPlugin<EntityIdScope> {
 
     public static final String PLUGIN_NAME = "collections-entity-data";
 
-    private final Set<DataMigration2<EntityIdScope>> entityDataMigrations;
-    private final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider;
-    private final MigrationInfoSerialization migrationInfoSerialization;
 
 
     @Inject
     public CollectionMigrationPlugin( final Set<DataMigration2<EntityIdScope>> entityDataMigrations,
                                       final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider,
                                       final MigrationInfoSerialization migrationInfoSerialization ) {
-        this.entityDataMigrations = entityDataMigrations;
-        this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
-        this.migrationInfoSerialization = migrationInfoSerialization;
+        super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
     }
 
 
@@ -72,68 +59,4 @@ public class CollectionMigrationPlugin implements MigrationPlugin {
     }
 
 
-    @Override
-    public void run( final ProgressObserver observer ) {
-
-        //run until complete
-        while(runMigration( observer )){
-         LOG.info( "Migration complete, checking for next run" );
-        }
-
-    }
-
-
-    @Override
-    public int getMaxVersion() {
-
-        int max = 0;
-
-        for(DataMigration2<EntityIdScope> entityMigration: entityDataMigrations){
-            max = Math.max( max, entityMigration.getMaxVersion() );
-        }
-
-        return max;
-    }
-
-
-    /**
-     * Try to run the migration
-     *
-     * @return True if we ran a migration
-     */
-    private boolean runMigration( final ProgressObserver po ) {
-        DataMigration2<EntityIdScope> migrationToExecute = null;
-
-
-        final int version = migrationInfoSerialization.getVersion( PLUGIN_NAME );
-
-        for ( DataMigration2<EntityIdScope> entityMigration : entityDataMigrations ) {
-            if ( entityMigration.supports( version ) ) {
-                if ( migrationToExecute != null ) {
-                    throw new DataMigrationException(
-                            "Two migrations attempted to migration the same version, this is not allowed.  Class '"
-                                    + migrationToExecute.getClass().getName() + "' and class '" + entityMigration
-                                    .getClass().getName()
-                                    + "' both support this version. This means something is wired incorrectly" );
-                }
-
-                migrationToExecute = entityMigration;
-            }
-        }
-
-        if(migrationToExecute == null){
-            LOG.info( "No migrations found to execute" );
-            return false;
-        }
-
-        //run the migration
-        final int newSystemVersion = migrationToExecute.migrate( version, entityIdScopeDataMigrationProvider, po );
-
-        migrationInfoSerialization.setVersion( PLUGIN_NAME, newSystemVersion );
-
-        //signal we've run a migration and return
-        return true;
-
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 48e0195..108a4d8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -73,7 +73,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-    private final MigrationInfoSerialization migrationInfoSerialization;
     private final EntityVersionCleanupFactory entityVersionCleanupFactory;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
 
@@ -82,14 +81,12 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                                        final MigrationInfoSerialization migrationInfoSerialization,
                                         final EntityVersionCleanupFactory entityVersionCleanupFactory,
                                         final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3 ) {
 
         this.keyspace = keyspace;
         this.allVersions = allVersions;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
-        this.migrationInfoSerialization = migrationInfoSerialization;
         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
         this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/AbstractMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/AbstractMigrationPlugin.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/AbstractMigrationPlugin.java
new file mode 100644
index 0000000..7b3aa00
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/AbstractMigrationPlugin.java
@@ -0,0 +1,125 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+
+
+/**
+ * Standard implementation logic for plugins to extend
+ * @param <T>
+ */
+public abstract class AbstractMigrationPlugin<T> implements MigrationPlugin {
+
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( AbstractMigrationPlugin.class );
+
+
+    private final Set<DataMigration2<T>> entityDataMigrations;
+    private final MigrationDataProvider<T> entityIdScopeDataMigrationProvider;
+    private final MigrationInfoSerialization migrationInfoSerialization;
+
+
+    protected AbstractMigrationPlugin( final Set<DataMigration2<T>> entityDataMigrations,
+                                       final MigrationDataProvider<T> entityIdScopeDataMigrationProvider,
+                                       final MigrationInfoSerialization migrationInfoSerialization ) {
+        this.entityDataMigrations = entityDataMigrations;
+        this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
+        this.migrationInfoSerialization = migrationInfoSerialization;
+    }
+
+
+    @Override
+    public void run( final ProgressObserver observer ) {
+
+        //run until complete
+        while(runMigration( observer )){
+         LOG.info( "Migration complete, checking for next run" );
+        }
+
+    }
+
+
+    @Override
+    public int getMaxVersion() {
+
+        int max = 0;
+
+        for(DataMigration2<T> entityMigration: entityDataMigrations){
+            max = Math.max( max, entityMigration.getMaxVersion() );
+        }
+
+        return max;
+    }
+
+
+    /**
+     * Try to run the migration
+     *
+     * @return True if we ran a migration
+     */
+    private boolean runMigration( final ProgressObserver po ) {
+        DataMigration2<T> migrationToExecute = null;
+
+
+        final int version = migrationInfoSerialization.getVersion( getName() );
+
+        for ( DataMigration2<T> entityMigration : entityDataMigrations ) {
+            if ( entityMigration.supports( version ) ) {
+                if ( migrationToExecute != null ) {
+                    throw new DataMigrationException(
+                            "Two migrations attempted to migration the same version, this is not allowed.  Class '"
+                                    + migrationToExecute.getClass().getName() + "' and class '" + entityMigration
+                                    .getClass().getName()
+                                    + "' both support this version. This means something is wired incorrectly" );
+                }
+
+                migrationToExecute = entityMigration;
+            }
+        }
+
+        if(migrationToExecute == null){
+            LOG.info( "No migrations found to execute" );
+            return false;
+        }
+
+        //run the migration
+        final int newSystemVersion = migrationToExecute.migrate( version, entityIdScopeDataMigrationProvider, po );
+
+        migrationInfoSerialization.setVersion( getName(), newSystemVersion );
+
+        //signal we've run a migration and return
+        return true;
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index d513b05..84fbaac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -19,18 +19,17 @@
 package org.apache.usergrid.persistence.graph.guice;
 
 
-import org.apache.usergrid.persistence.core.guice.V1Impl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.serialization.*;
-import org.apache.usergrid.persistence.graph.serialization.impl.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -43,6 +42,21 @@ import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListenerImpl;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeDataMigrationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV1Impl;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgesObservableImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.GraphManagerFactoryImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservableImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -67,6 +81,7 @@ import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
 
 
@@ -85,8 +100,6 @@ public class GraphModule extends AbstractModule {
 
         bind( GraphManagerFactory.class ).to(GraphManagerFactoryImpl.class);
 
-        //bind(GraphManager.class).to(GraphManagerImpl.class );
-
         bind(EdgesObservable.class).to(EdgesObservableImpl.class);
 
         bind(TargetIdObservable.class).to(TargetIdObservableImpl.class);
@@ -95,8 +108,6 @@ public class GraphModule extends AbstractModule {
 
         bind(EdgeMetadataSerialization.class).to(EdgeMetadataSerializationProxyImpl.class);
 
-        bind(EdgeMigrationStrategy.class).to(EdgeMetadataSerializationProxyImpl.class);
-
         /**
          * bindings for shard allocations
          */
@@ -117,10 +128,19 @@ public class GraphModule extends AbstractModule {
         bind( EdgeMetaRepair.class ).to( EdgeMetaRepairImpl.class );
         bind( EdgeDeleteRepair.class ).to( EdgeDeleteRepairImpl.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-            Multibinder.newSetBinder( binder(), DataMigration.class );
+
+        //wire up the edg migration
+        Multibinder<DataMigration2<ApplicationScope>> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<ApplicationScope>>() {} );
+
+
         dataMigrationMultibinder.addBinding().to( EdgeDataMigrationImpl.class );
 
+
+        //wire up the collection migration plugin
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to( GraphMigrationPlugin.class );
+
+
         /**
          * Add our listeners
          */
@@ -158,18 +178,17 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
 
         //Get the old version and the new one
-        migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class, V1Impl.class) );
-        migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class, V2Impl.class  ) );
+        migrationBinding.addBinding().to( Key.get( EdgeMetadataSerializationV1Impl.class) );
+        migrationBinding.addBinding().to( Key.get( EdgeMetadataSerializationV2Impl.class ) );
 
 
         /**
          * Migrations of our edge meta serialization
          */
 
-        bind(EdgeMetadataSerialization.class).annotatedWith( V1Impl.class ).to( EdgeMetadataSerializationV1Impl.class  );
-        bind(EdgeMetadataSerialization.class).annotatedWith( V2Impl.class ).to( EdgeMetadataSerializationV2Impl.class  );
+        bind( EdgeMetadataSerializationV1Impl.class );
+        bind( EdgeMetadataSerializationV2Impl.class );
         bind(EdgeMetadataSerialization.class).annotatedWith( ProxyImpl.class ).to( EdgeMetadataSerializationProxyImpl.class  );
-        bind(EdgeMigrationStrategy.class).annotatedWith(ProxyImpl.class).to( EdgeMetadataSerializationProxyImpl.class  );
 
     }
 
@@ -184,6 +203,35 @@ public class GraphModule extends AbstractModule {
     }
 
 
+
+    /**
+      * Configure via explicit declaration the migration path we can follow
+      * @param v1
+      * @param v2
+      * @return
+      */
+     @Singleton
+     @Inject
+     @Provides
+     public VersionedMigrationSet<EdgeMetadataSerialization> getVersions(final EdgeMetadataSerializationV1Impl v1, final EdgeMetadataSerializationV2Impl v2){
+
+
+         //migrate from v1 to v2
+         MigrationRelationship<EdgeMetadataSerialization> v1Tov2 = new MigrationRelationship<>( v1, v2);
+
+
+         //keep our curent tuple, v2, v2
+         MigrationRelationship<EdgeMetadataSerialization> current = new MigrationRelationship<EdgeMetadataSerialization>( v2, v2 );
+
+
+         //now create our set of versions
+         VersionedMigrationSet<EdgeMetadataSerialization> set = new VersionedMigrationSet<>( v1Tov2, current );
+
+         return set;
+
+     }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index 38916ac..a843b3c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization;
 
 import java.util.Iterator;
 
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedData;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -35,7 +36,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * Simple interface for serializing an edge meta data
  */
-public interface EdgeMetadataSerialization extends Migration {
+public interface EdgeMetadataSerialization extends Migration, VersionedData {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
deleted file mode 100644
index 41fe7ad..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.graph.serialization;
-
-import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
-import org.apache.usergrid.persistence.graph.Edge;
-
-import java.util.List;
-
-/**
- * Encapsulates version migration for graph
- */
-public interface EdgeMigrationStrategy extends MigrationStrategy<EdgeMetadataSerialization> {
-    public static final int MIGRATION_VERSION = 2;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
index 785e341..862c796 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
@@ -25,11 +25,14 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +44,8 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Encapsulates data mi
+ * Encapsulates the migration of edge meta data
  */
-
 public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
 
     private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
@@ -51,30 +53,35 @@ public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
     private final Keyspace keyspace;
     private final GraphManagerFactory graphManagerFactory;
     private final EdgesObservable edgesFromSourceObservable;
-    private final EdgeMigrationStrategy edgeMigrationStrategy;
+    private final VersionedMigrationSet<EdgeMetadataSerialization> allVersions;
+    private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
 
     @Inject
-    public EdgeDataMigrationImpl(final Keyspace keyspace,
-                                 final GraphManagerFactory graphManagerFactory,
-                                 final EdgesObservable edgesFromSourceObservable,
-                                 final EdgeMigrationStrategy edgeMigrationStrategy
-    ) {
+    public EdgeDataMigrationImpl( final Keyspace keyspace, final GraphManagerFactory graphManagerFactory,
+                                  final EdgesObservable edgesFromSourceObservable,
+
+                                  final VersionedMigrationSet<EdgeMetadataSerialization> allVersions,
+                                  final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
 
         this.keyspace = keyspace;
         this.graphManagerFactory = graphManagerFactory;
         this.edgesFromSourceObservable = edgesFromSourceObservable;
-        this.edgeMigrationStrategy = edgeMigrationStrategy;
+        this.allVersions = allVersions;
+        this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
     }
 
 
 
 
-
     @Override
-    public void migrate( final MigrationDataProvider<ApplicationScope> migrationDataProvider,
-                         final ProgressObserver observer ) {
+       public int migrate( final int currentVersion, final MigrationDataProvider<ApplicationScope> migrationDataProvider,
+                           final ProgressObserver observer ) {
+
         final AtomicLong counter = new AtomicLong();
 
+        final MigrationRelationship<EdgeMetadataSerialization>
+                migration = allVersions.getMigrationRelationship( currentVersion );
+
                migrationDataProvider.getData().flatMap(new Func1<ApplicationScope, Observable<?>>() {
                   @Override
                   public Observable call(final ApplicationScope applicationScope) {
@@ -99,8 +106,7 @@ public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
                                                     for ( Edge edge : edges ) {
                                                         logger.info( "Migrating meta for edge {}", edge );
                                                         final MutationBatch edgeBatch =
-                                                                edgeMigrationStrategy.getMigration().to()
-                                                                                     .writeEdge( applicationScope, edge );
+                                                                migration.to.writeEdge( applicationScope, edge );
                                                         batch.mergeShallow( edgeBatch );
                                                     }
 
@@ -114,7 +120,7 @@ public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
                                                     //update the observer so the admin can see it
                                                     final long newCount = counter.addAndGet( edges.size() );
 
-                                                    observer.update( getVersion(),
+                                                    observer.update( migration.to.getImplementationVersion(),
                                                             String.format( "Currently running.  Rewritten %d edge types",
                                                                     newCount ) );
                                                 }
@@ -124,12 +130,22 @@ public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
                   }
               });
 
+        return migration.to.getImplementationVersion();
+
     }
 
 
+
+
     @Override
-    public int getVersion() {
-        return edgeMigrationStrategy.getVersion();
+    public boolean supports( final int currentVersion ) {
+        return currentVersion <= edgeMetadataSerializationV2.getImplementationVersion();
     }
 
+
+    @Override
+    public int getMaxVersion() {
+        //we only support up to v2 ATM
+        return edgeMetadataSerializationV2.getImplementationVersion();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index acba53f..a5bde21 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -27,13 +27,15 @@ import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.guice.V1Impl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.*;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
@@ -49,90 +51,93 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
 
     private static final Logger logger = LoggerFactory.getLogger(EdgeMetadataSerializationProxyImpl.class);
 
-    private final DataMigrationManager dataMigrationManager;
     private final Keyspace keyspace;
-    private final EdgeMetadataSerialization previous;
-    private final EdgeMetadataSerialization current;
+    private final VersionedMigrationSet<EdgeMetadataSerialization> versions;
+    private final MigrationInfoCache migrationInfoCache;
 
 
     /**
      * Handles routing data to the right implementation, based on the current system migration version
      */
     @Inject
-    public EdgeMetadataSerializationProxyImpl(final DataMigrationManager dataMigrationManager, final Keyspace keyspace,
-                                              @V1Impl final EdgeMetadataSerialization previous,
-                                              @V2Impl final EdgeMetadataSerialization current) {
-        this.dataMigrationManager = dataMigrationManager;
+    public EdgeMetadataSerializationProxyImpl( final Keyspace keyspace,
+                                               final VersionedMigrationSet<EdgeMetadataSerialization> versions,
+                                               final MigrationInfoCache migrationInfoCache ) {
         this.keyspace = keyspace;
-        this.previous = previous;
-        this.current = current;
+        this.versions = versions;
+        this.migrationInfoCache = migrationInfoCache;
     }
 
 
     @Override
     public MutationBatch writeEdge( final ApplicationScope scope, final Edge edge ) {
 
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
 
-        if ( isOldVersion() ) {
+
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.writeEdge( scope, edge ) );
-            aggregateBatch.mergeShallow( current.writeEdge( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.from.writeEdge( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.to.writeEdge( scope, edge ) );
 
             return aggregateBatch;
         }
 
-        return current.writeEdge( scope, edge );
+        return migration.to.writeEdge( scope, edge );
     }
 
 
     @Override
     public MutationBatch removeEdgeTypeFromSource( final ApplicationScope scope, final Edge edge ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
 
-        if ( isOldVersion() ) {
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeEdgeTypeFromSource( scope, edge ) );
-            aggregateBatch.mergeShallow( current.removeEdgeTypeFromSource( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.from.removeEdgeTypeFromSource( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.to.removeEdgeTypeFromSource( scope, edge ) );
 
             return aggregateBatch;
         }
 
-        return current.removeEdgeTypeFromSource( scope, edge );
+        return migration.to.removeEdgeTypeFromSource( scope, edge );
     }
 
 
     @Override
     public MutationBatch removeEdgeTypeFromSource( final ApplicationScope scope, final Id sourceNode, final String type,
                                                    final long timestamp ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
 
-
-        if ( isOldVersion() ) {
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp ) );
-            aggregateBatch.mergeShallow( current.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp ) );
+            aggregateBatch.mergeShallow( migration.from.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp ) );
+            aggregateBatch.mergeShallow( migration.to.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp ) );
 
             return aggregateBatch;
         }
 
-        return current.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp );
+        return migration.to.removeEdgeTypeFromSource( scope, sourceNode, type, timestamp );
     }
 
 
     @Override
     public MutationBatch removeIdTypeFromSource( final ApplicationScope scope, final Edge edge ) {
 
-        if ( isOldVersion() ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeIdTypeFromSource( scope, edge ) );
-            aggregateBatch.mergeShallow( current.removeIdTypeFromSource( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.from.removeIdTypeFromSource( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.to.removeIdTypeFromSource( scope, edge ) );
 
             return aggregateBatch;
         }
 
-        return current.removeIdTypeFromSource( scope, edge );
+        return migration.to.removeIdTypeFromSource( scope, edge );
     }
 
 
@@ -140,34 +145,37 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
     public MutationBatch removeIdTypeFromSource( final ApplicationScope scope, final Id sourceNode, final String type,
                                                  final String idType, final long timestamp ) {
 
-        if ( isOldVersion() ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
             aggregateBatch
-                    .mergeShallow( previous.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp ) );
-            aggregateBatch.mergeShallow( current.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp ) );
+                    .mergeShallow( migration.from.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp ) );
+            aggregateBatch.mergeShallow( migration.to.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp ) );
 
             return aggregateBatch;
         }
 
-        return current.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp );
+        return migration.to.removeIdTypeFromSource( scope, sourceNode, type, idType, timestamp );
     }
 
 
     @Override
     public MutationBatch removeEdgeTypeToTarget( final ApplicationScope scope, final Edge edge ) {
 
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
 
-        if ( isOldVersion() ) {
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeEdgeTypeToTarget( scope, edge ) );
-            aggregateBatch.mergeShallow( current.removeEdgeTypeToTarget( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.from.removeEdgeTypeToTarget( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.to.removeEdgeTypeToTarget( scope, edge ) );
 
             return aggregateBatch;
         }
 
-        return current.removeEdgeTypeToTarget( scope, edge );
+        return migration.to.removeEdgeTypeToTarget( scope, edge );
     }
 
 
@@ -175,90 +183,102 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
     public MutationBatch removeEdgeTypeToTarget( final ApplicationScope scope, final Id targetNode, final String type,
                                                  final long timestamp ) {
 
-        if ( isOldVersion() ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeEdgeTypeToTarget( scope, targetNode, type, timestamp ) );
-            aggregateBatch.mergeShallow( current.removeEdgeTypeToTarget( scope, targetNode, type, timestamp ) );
+            aggregateBatch.mergeShallow( migration.from.removeEdgeTypeToTarget( scope, targetNode, type, timestamp ) );
+            aggregateBatch.mergeShallow( migration.to.removeEdgeTypeToTarget( scope, targetNode, type, timestamp ) );
 
             return aggregateBatch;
         }
 
-        return current.removeEdgeTypeToTarget( scope, targetNode, type, timestamp );
+        return migration.to.removeEdgeTypeToTarget( scope, targetNode, type, timestamp );
     }
 
 
     @Override
     public MutationBatch removeIdTypeToTarget( final ApplicationScope scope, final Edge edge ) {
 
-        if ( isOldVersion() ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeIdTypeToTarget( scope, edge ) );
-            aggregateBatch.mergeShallow( current.removeIdTypeToTarget( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.from.removeIdTypeToTarget( scope, edge ) );
+            aggregateBatch.mergeShallow( migration.to.removeIdTypeToTarget( scope, edge ) );
 
             return aggregateBatch;
         }
 
-        return current.removeIdTypeToTarget( scope, edge );
+        return migration.to.removeIdTypeToTarget( scope, edge );
     }
 
 
     @Override
     public MutationBatch removeIdTypeToTarget( final ApplicationScope scope, final Id targetNode, final String type,
                                                final String idType, final long timestamp ) {
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
 
-
-        if ( isOldVersion() ) {
+        if ( migration.needsMigration() ) {
             final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
 
-            aggregateBatch.mergeShallow( previous.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp ) );
-            aggregateBatch.mergeShallow( current.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp ) );
+            aggregateBatch.mergeShallow( migration.from.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp ) );
+            aggregateBatch.mergeShallow( migration.to.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp ) );
 
             return aggregateBatch;
         }
 
-        return current.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp );
+        return migration.to.removeIdTypeToTarget( scope, targetNode, type, idType, timestamp );
     }
 
 
     @Override
     public Iterator<String> getEdgeTypesFromSource( final ApplicationScope scope, final SearchEdgeType search ) {
-        if ( isOldVersion() ) {
-            return previous.getEdgeTypesFromSource( scope, search );
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.getEdgeTypesFromSource( scope, search );
         }
 
-        return current.getEdgeTypesFromSource( scope, search );
+        return migration.to.getEdgeTypesFromSource( scope, search );
     }
 
 
     @Override
     public Iterator<String> getIdTypesFromSource( final ApplicationScope scope, final SearchIdType search ) {
-        if ( isOldVersion() ) {
-            return previous.getIdTypesFromSource( scope, search );
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.getIdTypesFromSource( scope, search );
         }
 
-        return current.getIdTypesFromSource( scope, search );
+        return migration.to.getIdTypesFromSource( scope, search );
     }
 
 
     @Override
     public Iterator<String> getEdgeTypesToTarget( final ApplicationScope scope, final SearchEdgeType search ) {
-        if ( isOldVersion() ) {
-            return previous.getEdgeTypesToTarget( scope, search );
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.getEdgeTypesToTarget( scope, search );
         }
 
-        return current.getEdgeTypesToTarget( scope, search );
+        return migration.to.getEdgeTypesToTarget( scope, search );
     }
 
 
     @Override
     public Iterator<String> getIdTypesToTarget( final ApplicationScope scope, final SearchIdType search ) {
-        if ( isOldVersion() ) {
-            return previous.getIdTypesToTarget( scope, search );
+        final MigrationRelationship<EdgeMetadataSerialization> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.getIdTypesToTarget( scope, search );
         }
 
-        return current.getIdTypesToTarget( scope, search );
+        return migration.to.getIdTypesToTarget( scope, search );
     }
 
 
@@ -268,21 +288,19 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
     }
 
 
+
+
     /**
      * Return true if we're on an old version
      */
-    private boolean isOldVersion() {
-        return dataMigrationManager.getCurrentVersion() < getVersion();
+    private MigrationRelationship<EdgeMetadataSerialization> getMigrationRelationShip() {
+        return this.versions.getMigrationRelationship(
+                migrationInfoCache.getVersion( GraphMigrationPlugin.PLUGIN_NAME ) );
     }
 
-    @Override
-    public MigrationRelationship<EdgeMetadataSerialization> getMigration() {
-        return new MigrationRelationship<>(previous,current);
-    }
 
     @Override
-    public int getVersion() {
-        return MIGRATION_VERSION;
+    public int getImplementationVersion() {
+        throw new UnsupportedOperationException( "Proxies do not have an implementation version" );
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
index c634684..5fd270f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
@@ -387,6 +387,12 @@ public class EdgeMetadataSerializationV1Impl implements EdgeMetadataSerializatio
     }
 
 
+    @Override
+    public int getImplementationVersion() {
+        return GraphDataVersions.INITIAL.getVersion();
+    }
+
+
     /**
      * Inner class to serialize and edgeIdTypeKey
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index c9a279a..9e59dbf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -536,6 +536,12 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
     }
 
 
+    @Override
+    public int getImplementationVersion() {
+        return GraphDataVersions.META_SHARDING.getVersion();
+    }
+
+
     /**
      * Inner class to serialize and edgeIdTypeKey
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphDataVersions.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphDataVersions.java
new file mode 100644
index 0000000..60d36bd
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphDataVersions.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+
+/**
+ * Versions of data as they exist across our system
+ */
+public enum GraphDataVersions {
+    INITIAL(0),
+    META_SHARDING(1);
+
+    private final int version;
+
+
+    private GraphDataVersions( final int version ) {this.version = version;}
+
+
+    public int getVersion() {
+        return version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fa69be86/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
new file mode 100644
index 0000000..0d3405b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.migration;
+
+
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Migration plugin for the collection module
+ */
+@Singleton
+public class GraphMigrationPlugin extends AbstractMigrationPlugin<ApplicationScope> {
+
+    public static final String PLUGIN_NAME = "graph-data";
+
+
+
+    @Inject
+    public GraphMigrationPlugin( final Set<DataMigration2<ApplicationScope>> entityDataMigrations,
+                                      final MigrationDataProvider<ApplicationScope> entityIdScopeDataMigrationProvider,
+                                      final MigrationInfoSerialization migrationInfoSerialization ) {
+        super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
+    }
+
+
+    @Override
+    public String getName() {
+        return PLUGIN_NAME;
+    }
+
+
+}