You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:19:58 UTC

[08/50] [abbrv] incubator-usergrid git commit: WIP, still needs refactored and cleaned up.

WIP, still needs refactored and cleaned up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d3f8ee61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d3f8ee61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d3f8ee61

Branch: refs/heads/USERGRID-493
Commit: d3f8ee6184e2a85dc669e836201496748bf35001
Parents: 6652fe5
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Feb 26 18:57:03 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Feb 26 18:57:03 2015 -0700

----------------------------------------------------------------------
 .../mvcc/MvccEntityMigrationStrategy.java       |  30 --
 .../MvccEntitySerializationStrategy.java        |   3 +-
 .../impl/MvccEntityDataMigrationImpl.java       | 179 -----------
 ...vccEntitySerializationStrategyProxyImpl.java |  18 +-
 ...cEntitySerializationStrategyProxyV1Impl.java |  62 ----
 ...cEntitySerializationStrategyProxyV2Impl.java |  43 ---
 .../serialization/impl/SerializationModule.java |  22 +-
 .../migration/CollectionMigrationPlugin.java    |  68 +++++
 .../impl/migration/EntityIdScope.java           |  49 +++
 .../migration/MvccEntityDataMigrationImpl.java  | 179 +++++++++++
 .../persistence/core/guice/CommonModule.java    |   8 +-
 .../data/ApplicationDataMigration.java          |   2 +-
 .../migration/data/CollectionDataMigration.java |   2 +-
 .../core/migration/data/DataMigration.java      |  94 ------
 .../migration/data/DataMigrationManager.java    |  15 +-
 .../data/DataMigrationManagerImpl.java          | 304 ++++++-------------
 .../data/MigrationInfoSerialization.java        |  30 +-
 .../data/MigrationInfoSerializationImpl.java    |  84 ++---
 .../migration/data/newimpls/DataMigration2.java |  26 +-
 .../data/newimpls/MigrationPlugin.java          |  12 +-
 .../data/newimpls/MigrationRelationship.java    |  41 +++
 .../data/newimpls/ProgressObserver.java         |  50 +++
 .../migration/data/newimpls/VersionedData.java  |  38 +++
 .../migration/data/newimpls/VersionedSet.java   | 113 +++++++
 .../migration/schema/MigrationStrategy.java     |  60 ----
 .../core/rx/AllEntitiesInSystemObservable.java  |   2 +-
 .../core/scope/ApplicationEntityGroup.java      |  39 ---
 .../persistence/core/scope/EntityIdScope.java   |  44 ---
 .../core/guice/MaxMigrationVersion.java         |   1 -
 .../persistence/core/guice/TestModule.java      |   3 +-
 .../data/DataMigrationManagerImplTest.java      |   2 -
 .../impl/EdgeDataMigrationImpl.java             | 111 +++----
 32 files changed, 801 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
deleted file mode 100644
index 6fed25d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.collection.mvcc;
-
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
-
-/**
- * Encapsulates the migration path between versions
- */
-public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
index b0b7233..de3cab3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedData;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -36,7 +37,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * The interface that allows us to serialize an entity to disk
  */
-public interface MvccEntitySerializationStrategy extends Migration {
+public interface MvccEntitySerializationStrategy extends Migration, VersionedData {
 
     /**
      * Serialize the entity to the data store with the given collection context

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
deleted file mode 100644
index 4dfc64b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.functions.Func2;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Data migration strategy for entities
- */
-public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
-
-    private final Keyspace keyspace;
-    private final MvccEntityMigrationStrategy entityMigrationStrategy;
-
-
-    @Inject
-    public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
-
-        this.keyspace = keyspace;
-        this.entityMigrationStrategy = serializationStrategy;
-    }
-
-
-    @Override
-    public Observable migrate( final Observable<ApplicationEntityGroup> applicationEntityGroupObservable,
-                               final DataMigration.ProgressObserver observer ) {
-        final AtomicLong atomicLong = new AtomicLong();
-
-
-        //capture the time the test starts
-        final UUID now = UUIDGenerator.newTimeUUID();
-
-        return applicationEntityGroupObservable.flatMap( new Func1<ApplicationEntityGroup, Observable<Long>>() {
-            @Override
-            public Observable<Long> call( final ApplicationEntityGroup applicationEntityGroup ) {
-                final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
-
-                //go through each entity in the system, and load it's entire
-                // history
-                return Observable.from( entityIds ).subscribeOn( Schedulers.io() )
-                                 .parallel( new Func1<Observable<EntityIdScope<CollectionScope>>, Observable<Long>>() {
-
-
-                                     //process the ids in parallel
-                                     @Override
-                                     public Observable<Long> call(
-                                             final Observable<EntityIdScope<CollectionScope>> entityIdScopeObservable
-                                                                 ) {
-
-                                         final MutationBatch totalBatch = keyspace.prepareMutationBatch();
-
-                                         return entityIdScopeObservable.doOnNext(
-                                                 new Action1<EntityIdScope<CollectionScope>>() {
-
-                                                     //load the entity and add it to the toal mutation
-                                                     @Override
-                                                     public void call( final EntityIdScope<CollectionScope> idScope ) {
-
-                                                         //load the entity
-                                                         MigrationStrategy
-                                                                 .MigrationRelationship<MvccEntitySerializationStrategy>
-                                                                 migration = entityMigrationStrategy.getMigration();
-
-
-                                                         CollectionScope currentScope = idScope.getCollectionScope();
-                                                         //for each element in the history in the previous
-                                                         // version,
-                                                         // copy it to the CF in v2
-                                                         EntitySet allVersions = migration.from().load( currentScope,
-                                                                 Collections.singleton( idScope.getId() ), now );
-
-                                                         final MvccEntity version =
-                                                                 allVersions.getEntity( idScope.getId() );
-
-                                                         final MutationBatch versionBatch =
-                                                                 migration.to().write( currentScope, version );
-
-                                                         totalBatch.mergeShallow( versionBatch );
-                                                     }
-                                                 } )
-                                                 //every 100 flush the mutation
-                                                 .buffer( 100 ).doOnNext(
-                                                         new Action1<List<EntityIdScope<CollectionScope>>>() {
-                                                             @Override
-                                                             public void call(
-                                                                     final List<EntityIdScope<CollectionScope>> ids ) {
-                                                                 atomicLong.addAndGet( 100 );
-                                                                 executeBatch( totalBatch, observer, atomicLong );
-                                                             }
-                                                         } )
-                                                         //count the results
-                                                 .reduce( 0l,
-                                                         new Func2<Long, List<EntityIdScope<CollectionScope>>, Long>() {
-                                                             @Override
-                                                             public Long call( final Long aLong,
-                                                                               final
-                                                                               List<EntityIdScope<CollectionScope>>
-                                                                                       ids ) {
-                                                                 return aLong + ids.size();
-                                                             }
-                                                         } );
-                                     }
-                                 } );
-            }
-        } );
-    }
-
-
-    protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po,
-                                 final AtomicLong count ) {
-        try {
-            batch.execute();
-
-            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
-        }
-        catch ( ConnectionException e ) {
-            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
-            throw new DataMigrationException( "Unable to migrate batches ", e );
-        }
-    }
-
-
-    @Override
-    public int getVersion() {
-        return entityMigrationStrategy.getVersion();
-    }
-
-
-    @Override
-    public MigrationType getType() {
-        return MigrationType.Entities;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
index c18df29..03271a2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -35,27 +35,24 @@ import java.util.*;
 
 /**
  * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * migration data goes to both sources and is read from the old source. After the upgrade completes,
  * it will be available from the new source
  */
-public abstract class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy {
 
 
     protected final Keyspace keyspace;
-    protected final MvccEntitySerializationStrategy previous;
-    protected final MvccEntitySerializationStrategy current;
     private final MigrationInfoSerialization migrationInfoSerialization;
+    private final
 
 
     @Inject
     public MvccEntitySerializationStrategyProxyImpl(final Keyspace keyspace,
-                                                    final MvccEntitySerializationStrategy previous,
-                                                    final MvccEntitySerializationStrategy current,
+                                                    final Set<MvccEntitySerializationStrategy> allVersions,
                                                     final MigrationInfoSerialization migrationInfoSerialization) {
 
         this.keyspace = keyspace;
-        this.previous = previous;
-        this.current = current;
+
         this.migrationInfoSerialization = migrationInfoSerialization;
     }
 
@@ -159,5 +156,10 @@ public abstract class MvccEntitySerializationStrategyProxyImpl implements MvccEn
         return Collections.emptyList();
     }
 
+
+    @Override
+    public int getImplementationVersion() {
+        return 0;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
deleted file mode 100644
index f2c8d33..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.V1Impl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-
-
-/**
- * Version 3 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
- * it will be available from the new source
- */
-@Singleton
-public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerializationStrategyProxyImpl implements MvccEntityMigrationStrategy {
-
-
-    @Inject
-    public MvccEntitySerializationStrategyProxyV1Impl(final Keyspace keyspace,
-                                                      @V1Impl final MvccEntitySerializationStrategy previous,
-                                                      @V2Impl final MvccEntitySerializationStrategy current,
-                                                      final MigrationInfoSerialization migrationInfoSerialization) {
-        super( keyspace, previous, current,migrationInfoSerialization);
-    }
-
-    @Override
-    public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
-        return new MigrationRelationship<>(previous,current);
-    }
-
-    @Override
-    public int getVersion() {
-        return V2Impl.MIGRATION_VERSION;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
deleted file mode 100644
index d897ee4..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.*;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-
-
-/**
- * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
- * it will be available from the new source
- */
-@Singleton
-public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerializationStrategyProxyImpl implements MvccEntityMigrationStrategy {
-
-
-    @Inject
-    public MvccEntitySerializationStrategyProxyV2Impl( final MigrationInfoSerialization migrationInfoSerialization,
-                                                     final Keyspace keyspace,
-                                                     @V1ProxyImpl final MvccEntitySerializationStrategy previous,
-                                                     @V3Impl final MvccEntitySerializationStrategy current) {
-        super(keyspace,previous,current,migrationInfoSerialization);
-    }
-
-    @Override
-    public int getVersion() {
-        return V3Impl.MIGRATION_VERSION;
-    }
-
-
-    @Override
-    public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
-        return new MigrationRelationship<>(this.previous,this.current);
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index a86d7ec..a9c493f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -22,9 +22,15 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrate
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.*;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.MvccEntityDataMigrationImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V1Impl;
+import org.apache.usergrid.persistence.core.guice.V1ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.guice.V3Impl;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 import com.google.inject.AbstractModule;
@@ -56,10 +62,16 @@ public class SerializationModule extends AbstractModule {
         bind(MvccEntitySerializationStrategy.class ).annotatedWith(ProxyImpl.class)
                                                      .to(MvccEntitySerializationStrategyProxyV2Impl.class);
 
-        Multibinder<DataMigration> dataMigrationMultibinder =  Multibinder.newSetBinder( binder(), DataMigration.class );
+
+
+
+        Multibinder<DataMigration2> dataMigrationMultibinder =  Multibinder.newSetBinder( binder(), DataMigration2.class );
         dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
 
-        bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
+
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class).addBinding().to( CollectionMigrationPlugin.class );
+
+
 
         bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
new file mode 100644
index 0000000..5713af7
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
@@ -0,0 +1,68 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+
+
+/**
+ * Migration plugin for the collection module
+ */
+public class CollectionMigrationPlugin implements MigrationPlugin {
+
+
+    private final DataMigration2<EntityIdScope> entityDataMigration;
+    private final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider;
+
+
+    public CollectionMigrationPlugin( final DataMigration2<EntityIdScope> entityDataMigration,
+                                      final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider ) {
+        this.entityDataMigration = entityDataMigration;
+        this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
+    }
+
+
+    @Override
+    public String getName() {
+        return "collections-entity-data";
+    }
+
+
+    @Override
+    public void run( final ProgressObserver observer ) {
+       entityDataMigration.migrate( entityIdScopeDataMigrationProvider, observer );
+    }
+
+
+    @Override
+    public int getMaxVersion() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
new file mode 100644
index 0000000..0e70e75
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
@@ -0,0 +1,49 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+/**
+ * Tuple containing collectionscope and entityid
+ */
+public class EntityIdScope{
+    private final Id id;
+    private final CollectionScope collectionScope;
+
+    public EntityIdScope(CollectionScope collectionScope, Id id){
+        this.id = id;
+        this.collectionScope = collectionScope;
+    }
+
+
+    public Id getId() {
+        return id;
+    }
+
+
+    public CollectionScope getCollectionScope() {
+        return collectionScope;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
new file mode 100644
index 0000000..73efcba
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -0,0 +1,179 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Data migration strategy for entities
+ */
+public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope> {
+
+    private final Keyspace keyspace;
+    private final MvccEntityMigrationStrategy entityMigrationStrategy;
+
+
+    @Inject
+    public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
+
+        this.keyspace = keyspace;
+        this.entityMigrationStrategy = serializationStrategy;
+    }
+
+
+    @Override
+    public int getVersion() {
+        return entityMigrationStrategy.getVersion();
+    }
+
+
+    @Override
+    public void migrate( final MigrationDataProvider<EntityIdScope> migrationDataProvider,   final ProgressObserver observer ) {
+
+        final AtomicLong atomicLong = new AtomicLong();
+
+
+        //capture the time the test starts
+
+        final UUID startTime = UUIDGenerator.newTimeUUID();
+
+        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
+                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
+
+
+                       //process the ids in parallel
+                       @Override
+                       public Observable<Long> call(
+                               final Observable<EntityIdScope>
+                                       entityIdScopeObservable ) {
+
+                           final MutationBatch totalBatch =
+                                   keyspace.prepareMutationBatch();
+
+                           return entityIdScopeObservable.doOnNext(
+                                   new Action1<EntityIdScope>() {
+
+                                       //load the entity and add it to the toal mutation
+                                       @Override
+                                       public void call( final EntityIdScope idScope ) {
+
+                                           //load the entity
+                                           MigrationStrategy
+                                                   .MigrationRelationship<MvccEntitySerializationStrategy>
+                                                   migration = entityMigrationStrategy
+                                                   .getMigration();
+
+
+                                           CollectionScope currentScope =
+                                                   idScope.getCollectionScope();
+
+                                           //for each element in the history in the
+                                           // previous
+                                           // version,
+                                           // copy it to the CF in v2
+
+
+                                           EntitySet allVersions = migration.from()
+                                                                            .load( currentScope,
+                                                                                    Collections
+                                                                                            .singleton(
+                                                                                                    idScope.getId() ),
+                                                                                    startTime );
+
+                                           final MvccEntity version = allVersions
+                                                   .getEntity( idScope.getId() );
+
+                                           final MutationBatch versionBatch =
+                                                   migration.to().write( currentScope,
+                                                           version );
+
+                                           totalBatch.mergeShallow( versionBatch );
+                                       }
+                                   } )
+                                   //every 100 flush the mutation
+                                   .buffer( 100 ).doOnNext(
+                                           new Action1<List<EntityIdScope>>() {
+                                               @Override
+                                               public void call(
+                                                       final List<EntityIdScope> ids ) {
+                                                   atomicLong.addAndGet( 100 );
+                                                   executeBatch( totalBatch, observer,
+                                                           atomicLong );
+                                               }
+                                           } )
+                                           //count the results
+                                   .reduce( 0l,
+                                           new Func2<Long, List<EntityIdScope>, Long>
+                                                   () {
+                                               @Override
+                                               public Long call( final Long aLong,
+                                                                 final List<EntityIdScope> ids ) {
+                                                   return aLong + ids.size();
+                                               }
+                                           } );
+                       }
+                   } ).toBlocking().last();
+
+        //now we update the progress observer
+
+        observer.update( getVersion(), "Finished for this step.  Migrated " + migrated + "entities total. ");
+    }
+
+
+    protected void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+        try {
+            batch.execute();
+
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+        }
+        catch ( ConnectionException e ) {
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            throw new DataMigrationException( "Unable to migrate batches ", e );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index 657d965..61b6b04 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
@@ -77,11 +78,8 @@ public class CommonModule extends AbstractModule {
 
 
         //do multibindings for migrations
-        Multibinder<DataMigration> applicationDataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
-//        dataMigrationMultibinder.addBinding();
-//        dataMigrationManagerMultibinder.addBinding().to( DataMigrationManagerImpl.class );
-//        migrationBinding.addBinding().to( Key.get( MigrationInfoSerialization.class ) );
-
+        //create the empty multibinder so other plugins can use it
+         Multibinder.newSetBinder( binder(), MigrationPlugin.class);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
index 6dca092..bbff0b9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.core.migration.data;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import rx.Observable;
+
 
 /**
  * Migrate applications

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
index e29571c..df1af72 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.persistence.core.migration.data;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
+
 
 /**
  * Migrate Collections

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
deleted file mode 100644
index e1e3092..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.migration.data;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
-
-/**
- * An interface for updating data.  Has 2 basic functions. First it will perform the migration and update the status
- * object.
- *
- * Second it will only migrate a single version.  For instance, it will migrate from 0->1, or from 1->2.  All migrations
- * must follow the following basic rules.
- *
- * <ol>
- *     <li>They must not modify the structure of an existing column family.  If the data format changes, a new
- * implementation and column family must be created.  A proxy must be defined to do dual writes/single reads. </li>
- * <li>The migration must update the progress observer.  This information should be made available cluster wide.</li>
- * <li>In the event a migration fails with an error, we should be able to roll back and remove new column families.  We
- * can then fix the bug, and deploy again.  Hence the need for the proxy, dual writes, and an immutable CF
- * format</li>
- * </ol>
- */
-
-
-public interface DataMigration <T> {
-
-
-    /**
-     * Migrate the data to the specified version
-     * @param observer
-     * @Return an observable containing the count of the number of elements migrated
-     * @throws Throwable
-     */
-    public Observable<Long> migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
-
-    /**
-     * Get the version of this migration.  It must be unique.
-     * @return
-     */
-    public int getVersion();
-
-    public MigrationType getType();
-
-    public interface ProgressObserver{
-        /**
-         * Mark the migration as failed
-         * @param migrationVersion The migration version running during the failure
-         * @param reason The reason to save
-         */
-        public void failed(final int migrationVersion, final String reason);
-
-        /**
-         * Mark the migration as failed with a stack trace
-         * @param migrationVersion The migration version running during the failure
-         * @param reason The error description to save
-         * @param throwable The error that happened
-         */
-        public void failed(final int migrationVersion, final String reason, final Throwable throwable);
-
-
-        /**
-         * Update the status of the migration with the message
-         *
-         * @param message The message to save for the status
-         */
-        public void update(final int migrationVersion, final String message);
-    }
-
-    public enum MigrationType{
-        Entities,
-        Applications,
-        System,
-        Collections
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
index e47e264..1ca8d90 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.core.migration.data;
 
 
+import java.util.List;
+import java.util.Set;
+
 import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
 
 
@@ -43,13 +46,13 @@ public interface DataMigrationManager {
      * Get the current version of the schema
      * @return
      */
-    public int getCurrentVersion();
+    public int getCurrentVersion(final String pluginName);
 
     /**
      * Reset the system version to the version specified
      * @param version
      */
-    public void resetToVersion(final int version);
+    public void resetToVersion(final String pluginName, final int version);
 
     /**
      * Invalidate the cache for versions
@@ -60,5 +63,11 @@ public interface DataMigrationManager {
     /**
      * Return that last status of the migration
      */
-    public String getLastStatus();
+    public String getLastStatus(final String pluginName);
+
+    /**
+     * Return a list of all plugin names
+     * @return
+     */
+    public Set<String> getPluginNames();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index 380bedc..ddb0a61 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -21,24 +21,24 @@ package org.apache.usergrid.persistence.core.migration.data;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 @Singleton
@@ -46,256 +46,122 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
     private static final Logger LOG = LoggerFactory.getLogger( DataMigrationManagerImpl.class );
 
-    private final TreeMap<Integer, DataMigration> migrationTreeMap = new TreeMap<>();
+    private final Map<String, MigrationPlugin> migrationPlugins = new HashMap<>();
 
     private final MigrationInfoSerialization migrationInfoSerialization;
-    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
-    private final ApplicationObservable applicationObservable;
-    private final Set<DataMigration> dataMigrations;
-
-
-    @Inject
-    public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
-                                     final Set<DataMigration> dataMigrations,
-                                     final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
-                                     final ApplicationObservable applicationObservable
-    ) {
-
-        Preconditions.checkNotNull( migrationInfoSerialization,
-                "migrationInfoSerialization must not be null" );
-        Preconditions.checkNotNull( dataMigrations, "migrations must not be null" );
-        Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
-        Preconditions.checkNotNull( applicationObservable, "applicationObservable must not be null" );
-        this.dataMigrations = dataMigrations;
-        this.migrationInfoSerialization = migrationInfoSerialization;
-        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
-        this.applicationObservable = applicationObservable;
-
-    }
-
-
-    @Override
-    public void migrate() throws MigrationException {
-
-        if (!populateTreeMap())
-            return;
-
-        int currentVersion = migrationInfoSerialization.getCurrentVersion();
-
-        if(currentVersion <= 0){
-            resetToHighestVersion();
-            //you have no work to do, just use the latest
-            return;
-        }
-
-        LOG.info( "Saved schema version is {}, max migration version is {}", currentVersion,
-                migrationTreeMap.lastKey() );
-
-        //we have our migrations to run, execute them
-        final Collection< DataMigration> migrationsToRun =  migrationTreeMap.tailMap( currentVersion, false ).values();
-
-
-
-        final CassandraProgressObserver observer = new CassandraProgressObserver();
-
-        final Observable<ApplicationScope> appScopeObservable = applicationObservable.getAllApplicationScopes();
-
-        final Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() {
-            @Override
-            public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) {
-                return allEntitiesInSystemObservable.getAllEntitiesInSystem(appScopeObservable, 1000);
-            }
-        });
-
-        final Observable<ApplicationDataMigration> applicationMigrationsRx = Observable.from(migrationsToRun)
-            .filter(new Func1<DataMigration, Boolean>() {
-                @Override
-                public Boolean call(DataMigration dataMigration) {
-                    return dataMigration instanceof ApplicationDataMigration;
-                }
-            }).map(new Func1<DataMigration, ApplicationDataMigration>() {
-                @Override
-                public ApplicationDataMigration call(DataMigration dataMigration) {
-                    return (ApplicationDataMigration)dataMigration;
-                }
-            });
-
-        final Observable<CollectionDataMigration> collectionMigrationsRx = Observable.from(migrationsToRun)
-            .filter(new Func1<DataMigration, Boolean>() {
-                @Override
-                public Boolean call(DataMigration dataMigration) {
-                    return dataMigration instanceof CollectionDataMigration;
-                }
-            }).map(new Func1<DataMigration, CollectionDataMigration>() {
+    /**
+     * Cache to cache versions temporarily
+     */
+    private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
+            //cache the local value for 1 minute
+            .expireAfterWrite( 1, TimeUnit.MINUTES ).build( new CacheLoader<String, Integer>() {
                 @Override
-                public CollectionDataMigration call(DataMigration dataMigration) {
-                    return (CollectionDataMigration) dataMigration;
+                public Integer load( final String key ) throws Exception {
+                    return migrationInfoSerialization.getVersion( key );
                 }
-            });
-
-        Observable applications = applicationMigrationsRx
-            .doOnNext(new Action1<ApplicationDataMigration>() {
-                        @Override
-                        public void call(ApplicationDataMigration dataMigration) {
-
-                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
-
-                            final int migrationVersion = dataMigration.getVersion();
-
-                            LOG.info("Running migration version {}", migrationVersion);
-
-                            observer.update(migrationVersion, "Starting migration");
+            } );
 
-                            //perform this migration, if it fails, short circuit
-                            try {
-                                dataMigration.migrate(appScopeObservable, observer).toBlocking().lastOrDefault(null);
-                            } catch (Throwable throwable) {
-                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
-                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
-                                throw new RuntimeException(throwable);
-                            }
 
-                            //we had an unhandled exception or the migration failed, short circuit
-                            if (observer.failed) {
-                                return;
-                            }
-
-                            //set the version
-                            migrationInfoSerialization.setVersion(migrationVersion);
-
-                            //update the observer for progress so other nodes can see it
-                            observer.update(migrationVersion, "Completed successfully");
-
-                        }
-                    });
-
-
-        Observable entities =collectionMigrationsRx
-                    .doOnNext(new Action1<CollectionDataMigration>() {
-                        @Override
-                        public void call(CollectionDataMigration dataMigration) {
-                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
-
-                            final int migrationVersion = dataMigration.getVersion();
+    @Inject
+    public DataMigrationManagerImpl( final Set<MigrationPlugin> plugins,
+                                     final MigrationInfoSerialization migrationInfoSerialization ) {
 
-                            LOG.info("Running migration version {}", migrationVersion);
+        Preconditions.checkNotNull( plugins, "plugins must not be null" );
+        Preconditions.checkNotNull( migrationInfoSerialization, "migrationInfoSerialization must not be null" );
 
-                            observer.update(migrationVersion, "Starting migration");
+        this.migrationInfoSerialization = migrationInfoSerialization;
 
-                            //perform this migration, if it fails, short circuit
-                            try {
-                                dataMigration.migrate(entitiesObservable, observer).toBlocking().lastOrDefault(null);
-                            } catch (Throwable throwable) {
-                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
-                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
-                                throw new RuntimeException(throwable);
-                            }
 
-                            //we had an unhandled exception or the migration failed, short circuit
-                            if (observer.failed) {
-                                return;
-                            }
+        for ( MigrationPlugin plugin : plugins ) {
+            final String name = plugin.getName();
 
-                            //set the version
-                            migrationInfoSerialization.setVersion(migrationVersion);
 
-                            //update the observer for progress so other nodes can see it
-                            observer.update(migrationVersion, "Completed successfully");
-                        }
-                    });
+            final MigrationPlugin existing = migrationPlugins.get( name );
 
+            if ( existing != null ) {
+                throw new IllegalArgumentException( "Duplicate plugin name detected.  A plugin with name " + name
+                        + " is already implemented by class '" + existing.getClass().getName() + "'.  Class '" + plugin
+                        .getClass().getName() + "' is also trying to implement this name." );
+            }
 
-        try {
-            Observable
-                .merge(applications, entities)
-                .subscribeOn(Schedulers.io())
-                .toBlocking().lastOrDefault(null);
-            migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
-        } catch (Exception e){
-            LOG.error("Migration Failed",e);
+            migrationPlugins.put( name, plugin );
         }
-
     }
 
 
+    @Override
+    public void migrate() throws MigrationException {
 
-    private boolean populateTreeMap() {
-        if ( migrationTreeMap.isEmpty() ) {
-            for ( DataMigration migration : dataMigrations ) {
-
-                Preconditions.checkNotNull(migration,
-                    "A migration instance in the set of migrations was null.  This is not allowed");
-
-                final int version = migration.getVersion();
-
-                final DataMigration existing = migrationTreeMap.get( version );
-
-                if ( existing != null ) {
+        /**
+         * Invoke each plugin to attempt a migration
+         */
+        for(final MigrationPlugin plugin: migrationPlugins.values()){
+            final ProgressObserver observer = new CassandraProgressObserver(plugin.getName());
 
-                    final Class<? extends DataMigration> existingClass = existing.getClass();
+            plugin.run( observer );
+        }
 
-                    final Class<? extends DataMigration> currentClass = migration.getClass();
 
+    }
 
-                    throw new DataMigrationException( String.format(
-                        "Data migrations must be unique.  Both classes %s and %s have version %d",
-                        existingClass, currentClass, version ) );
-                }
 
-                migrationTreeMap.put( version, migration );
-            }
+    @Override
+    public boolean isRunning() {
 
+        for(final String pluginName :getPluginNames()){
+           if( migrationInfoSerialization.getStatusCode(pluginName) == StatusCode.RUNNING.status){
+               return true;
+           }
         }
-        if(migrationTreeMap.isEmpty()) {
-            LOG.warn("No migrations found to run, exiting");
-            return false;
-        }
 
-        return true;
+
+        return false;
     }
 
 
     @Override
-    public boolean isRunning() {
-        return migrationInfoSerialization.getStatusCode() == StatusCode.RUNNING.status;
+    public void invalidate() {
+        versionCache.invalidateAll();
     }
 
 
     @Override
-    public void invalidate() {
-        migrationInfoSerialization.invalidate();
+    public int getCurrentVersion( final String pluginName ) {
+        Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
+        return migrationInfoSerialization.getVersion( pluginName );
     }
 
 
     @Override
-    public int getCurrentVersion() {
-        return migrationInfoSerialization.getCurrentVersion();
-    }
+    public void resetToVersion( final String pluginName, final int version ) {
+        Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
 
+        final MigrationPlugin plugin = migrationPlugins.get( pluginName );
 
+        Preconditions.checkArgument( plugin != null, "Plugin " + pluginName + " could not be found" );
 
-    @Override
-    public void resetToVersion( final int version ) {
-        final int highestAllowed = migrationTreeMap.lastKey();
+        final int highestAllowed = plugin.getMaxVersion();
 
         Preconditions.checkArgument( version <= highestAllowed,
-                "You cannot set a version higher than the max of " + highestAllowed);
+                "You cannot set a version higher than the max of " + highestAllowed );
         Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
 
-        migrationInfoSerialization.setVersion( version );
+        migrationInfoSerialization.setVersion( pluginName, version );
     }
 
-    private int resetToHighestVersion( ) {
-        final int highestAllowed = migrationTreeMap.lastKey();
-        resetToVersion(highestAllowed);
-        return highestAllowed;
+
+    @Override
+    public String getLastStatus( final String pluginName ) {
+        Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
+        return migrationInfoSerialization.getStatusMessage( pluginName );
     }
 
+
     @Override
-    public String getLastStatus() {
-        return migrationInfoSerialization.getStatusMessage();
+    public Set<String> getPluginNames() {
+        return migrationPlugins.keySet();
     }
 
 
@@ -314,16 +180,20 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     }
 
 
-    private final class CassandraProgressObserver implements DataMigration.ProgressObserver {
+    private final class CassandraProgressObserver implements ProgressObserver {
+
+        private final String pluginName;
 
         private boolean failed = false;
 
 
+        private CassandraProgressObserver( final String pluginName ) {this.pluginName = pluginName;}
+
+
         @Override
         public void failed( final int migrationVersion, final String reason ) {
 
-            final String storedMessage = String.format(
-                    "Failed to migrate, reason is appended.  Error '%s'", reason );
+            final String storedMessage = String.format( "Failed to migrate, reason is appended.  Error '%s'", reason );
 
 
             update( migrationVersion, storedMessage );
@@ -332,7 +202,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
             failed = true;
 
-            migrationInfoSerialization.setStatusCode( StatusCode.ERROR.status );
+            migrationInfoSerialization.setStatusCode( pluginName, StatusCode.ERROR.status );
         }
 
 
@@ -342,30 +212,28 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
             throwable.printStackTrace( new PrintWriter( stackTrace ) );
 
 
-            final String storedMessage = String.format(
-                "Failed to migrate, reason is appended.  Error '%s' %s", reason, stackTrace.toString() );
+            final String storedMessage = String.format( "Failed to migrate, reason is appended.  Error '%s' %s", reason,
+                    stackTrace.toString() );
 
             update( migrationVersion, storedMessage );
 
 
-            LOG.error( "Unable to migrate version {} due to reason {}.",
-                    migrationVersion, reason, throwable );
+            LOG.error( "Unable to migrate version {} due to reason {}.", migrationVersion, reason, throwable );
 
             failed = true;
 
-            migrationInfoSerialization.setStatusCode( StatusCode.ERROR.status );
+            migrationInfoSerialization.setStatusCode( pluginName, StatusCode.ERROR.status );
         }
 
 
         @Override
         public void update( final int migrationVersion, final String message ) {
-            final String formattedOutput = String.format(
-                    "Migration version %d.  %s", migrationVersion, message );
+            final String formattedOutput = String.format( "Migration version %d.  %s", migrationVersion, message );
 
             //Print this to the info log
             LOG.info( formattedOutput );
 
-            migrationInfoSerialization.setStatusMessage( formattedOutput );
+            migrationInfoSerialization.setStatusMessage( pluginName, formattedOutput );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
index 3caa6aa..a972f8c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
@@ -26,42 +26,48 @@ public interface MigrationInfoSerialization extends Migration {
 
     /**
      * Save the message to the cluster
+     * @param pluginName the name of hte plugin
      * @param message
      */
-    public void setStatusMessage( final String message );
+    public void setStatusMessage(final String pluginName,  final String message );
 
     /**
-     * Get the last status
+     * Get the last status for the plugin
      * @return
      */
-    public String getStatusMessage();
+    public String getStatusMessage(final String pluginName);
 
     /**
-     * Save the version
+     * Save the version for the plugin
      * @param version
      */
-    public void setVersion(final int version);
+    public void setVersion(final String pluginName, final int version);
 
     /**
      * Return the version
      * @return
      */
-    public int getVersion();
+    public int getVersion(final String pluginName);
 
     /**
      * Set the status and save them
+     * @param pluginName The name of the plugin
      * @param status
      * @return
      */
-    public void setStatusCode( final int status );
+    public void setStatusCode(final String pluginName, final int status );
 
     /**
-     *
+     *  Get the status code for the plugin
      * @return The integer that's saved
      */
-    public int getStatusCode();
+    public int getStatusCode(final String pluginName);
 
-    public int getCurrentVersion();
-
-    public void invalidate();
+    /**
+     * This is deprecated, and will be used to migrate from the old version information to the new format.
+     * Should return -1 if not set
+     * @return
+     */
+    @Deprecated
+    public int getSystemVersion();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index aeee0bb..f976568 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -50,18 +50,6 @@ import com.netflix.astyanax.serializers.StringSerializer;
 public class MigrationInfoSerializationImpl implements MigrationInfoSerialization {
 
     /**
-     * Cache to cache versions temporarily
-     */
-    private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
-        //cache the local value for 1 minute
-        .expireAfterWrite(1, TimeUnit.MINUTES).build( new CacheLoader<String, Integer>() {
-            @Override
-            public Integer load( final String key ) throws Exception {
-                return getVersion();
-            }
-        } );
-
-    /**
      * Just a hard coded scope since we need it
      */
     private static final Id STATIC_ID =
@@ -78,7 +66,10 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
             new MultiTennantColumnFamily<>( "Data_Migration_Info", ROW_KEY_SER, STRING_SERIALIZER );
 
 
-    private static final ScopedRowKey<String> ROW_KEY = ScopedRowKey.fromKey( STATIC_ID, "" );
+    /**
+     * The row key we previously used to store versions.  This is required to migrate versions in each module to the correct version
+     */
+    private static final ScopedRowKey<String> LEGACY_ROW_KEY = ScopedRowKey.fromKey( STATIC_ID, "" );
 
     private static final String COL_STATUS_MESSAGE = "statusMessage";
 
@@ -96,10 +87,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
 
 
     @Override
-    public void setStatusMessage( final String message ) {
+    public void setStatusMessage(final String pluginName,  final String message ) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
 
         try {
-            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COL_STATUS_MESSAGE ).putValue( message, null )
+            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COL_STATUS_MESSAGE ).putValue( message, null )
                     .execute();
         }
         catch ( ConnectionException e ) {
@@ -109,9 +102,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
 
 
     @Override
-    public String getStatusMessage() {
+    public String getStatusMessage(final String pluginName) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
         try {
-            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COL_STATUS_MESSAGE )
+            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COL_STATUS_MESSAGE )
                            .execute().getResult().getStringValue();
         }
         //swallow, it doesn't exist
@@ -125,28 +121,32 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
 
 
     @Override
-    public void setVersion( final int version ) {
+    public void setVersion(final String pluginName, final int version ) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
         try {
-            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COLUMN_VERSION ).putValue( version, null )
+            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COLUMN_VERSION ).putValue( version, null )
                     .execute();
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to save status", e );
         }
-
-        versionCache.invalidateAll();
     }
 
 
     @Override
-    public int getVersion() {
+    public int getVersion(final String pluginName) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
         try {
-            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COLUMN_VERSION ).execute()
+            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COLUMN_VERSION ).execute()
                            .getResult().getIntegerValue();
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return 0;
+            return -1;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
@@ -155,9 +155,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
 
 
     @Override
-    public void setStatusCode( final int status ) {
+    public void setStatusCode(final String pluginName, final int status ) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
         try {
-            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COLUMN_STATUS_CODE ).putValue( status, null )
+            keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COLUMN_STATUS_CODE ).putValue( status, null )
                     .execute();
         }
         catch ( ConnectionException e ) {
@@ -167,32 +170,37 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
 
 
     @Override
-    public int getStatusCode() {
+    public int getStatusCode(final String pluginName) {
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
         try {
-            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COLUMN_STATUS_CODE )
+            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COLUMN_STATUS_CODE )
                            .execute().getResult().getIntegerValue();
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return 0;
+            return -1;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
         }
     }
 
+
     @Override
-    public int getCurrentVersion() {
+    public int getSystemVersion() {
         try {
-            return versionCache.get("currentversion");
-        }catch (Exception ee){
-            throw new RuntimeException(ee);
+            return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( LEGACY_ROW_KEY ).getColumn( COLUMN_VERSION )
+                           .execute().getResult().getIntegerValue();
+        }
+        //swallow, it doesn't exist
+        catch ( NotFoundException nfe ) {
+            return -1;
+        }
+        catch ( ConnectionException e ) {
+            throw new DataMigrationException( "Unable to retrieve status", e );
         }
-    }
-
-    @Override
-    public void invalidate() {
-        versionCache.invalidateAll();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
index f660c02..5830335 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
@@ -41,31 +41,7 @@ public interface DataMigration2<T> {
      * Get the version of this migration. It should be unique within the scope of the plugin
      * @return
      */
-    public int version();
+    public int getVersion();
 
 
-    public interface ProgressObserver{
-            /**
-             * Mark the migration as failed
-             * @param migrationVersion The migration version running during the failure
-             * @param reason The reason to save
-             */
-            public void failed(final int migrationVersion, final String reason);
-
-            /**
-             * Mark the migration as failed with a stack trace
-             * @param migrationVersion The migration version running during the failure
-             * @param reason The error description to save
-             * @param throwable The error that happened
-             */
-            public void failed(final int migrationVersion, final String reason, final Throwable throwable);
-
-
-            /**
-             * Update the status of the migration with the message
-             *
-             * @param message The message to save for the status
-             */
-            public void update(final int migrationVersion, final String message);
-        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
index f04a035..880cfd1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
@@ -24,8 +24,6 @@
 package org.apache.usergrid.persistence.core.migration.data.newimpls;
 
 
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-
 
 /**
  * A simple interface to return migration plugins.  All versions within this migration plugin should have a name
@@ -34,7 +32,7 @@ public interface MigrationPlugin {
 
 
     /**
-     * Get the name of the plugin
+     * Get the name of the plugin.  Must be unique
      * @return
      */
     public String getName();
@@ -42,6 +40,12 @@ public interface MigrationPlugin {
     /**
      * Run any migrations that may need to be run in this plugin
      */
-    public void run(DataMigration.ProgressObserver observer);
+    public void run(ProgressObserver observer);
+
+    /**
+     * Get the maximum migration version this plugin implements
+     * @return
+     */
+    public int getMaxVersion();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
new file mode 100644
index 0000000..a9ad187
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+/**
+ * Simple relationship that defines the current state of the source and destination data versions
+ */
+public class MigrationRelationship<T extends VersionedData> {
+
+    //public so it's FAST.  It's also immutable
+    public final T from;
+    public final T to;
+
+
+    public MigrationRelationship( T from, T to ) {
+        this.from = from;
+        this.to = to;
+    }
+}