You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:20:08 UTC

[18/50] [abbrv] incubator-usergrid git commit: Tests for migrations in both modules complete

Tests for migrations in both modules complete


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48127152
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48127152
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48127152

Branch: refs/heads/USERGRID-493
Commit: 481271523b0056f7cf9bf21194e166f490fabb09
Parents: 802dcde
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 4 11:43:13 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 4 11:43:13 2015 -0700

----------------------------------------------------------------------
 .../collection/impl/EntityDeletedTask.java      |   8 +-
 .../migration/MvccEntityDataMigrationImpl.java  | 405 +++++++------------
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 217 ++++++++++
 .../MvccEntityDataMigrationV1ToV3ImplTest.java  | 106 ++---
 .../MvccEntityDataMigrationV2ToV3ImplTest.java  | 100 +++++
 .../core/guice/MigrationManagerRule.java        |   7 +
 6 files changed, 498 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 0b4b739..dbe58d4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -107,12 +107,10 @@ public class EntityDeletedTask implements Task<Void> {
         final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
         final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
 
-        throw new NotImplementedException( "Implement unique cleanup here" );
+        entityDelete.execute();
+        logDelete.execute();
 //
-//        entityDelete.execute();
-//        logDelete.execute();
-//
-//        return null;
+        return null;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 7580a26..05e5db5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -20,24 +20,27 @@
 package org.apache.usergrid.persistence.collection.serialization.impl.migration;
 
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.impl.EntityDeletedTask;
 import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
@@ -56,11 +59,8 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Subscriber;
-import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
-import rx.functions.Func2;
-import rx.observables.GroupedObservable;
 import rx.schedulers.Schedulers;
 
 
@@ -70,6 +70,9 @@ import rx.schedulers.Schedulers;
 @Singleton
 public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope> {
 
+
+    private static final Logger LOGGER = LoggerFactory.getLogger( MvccEntityDataMigrationImpl.class );
+
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
@@ -77,12 +80,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
 
 
+
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final EntityVersionCleanupFactory entityVersionCleanupFactory,
-                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3 ) {
+                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3
+                                      ) {
 
         this.keyspace = keyspace;
         this.allVersions = allVersions;
@@ -107,7 +112,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
 
     @Override
     public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
-                         final ProgressObserver observer ) {
+                        final ProgressObserver observer ) {
 
         final AtomicLong atomicLong = new AtomicLong();
 
@@ -116,264 +121,147 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
         final MigrationRelationship<MvccEntitySerializationStrategy> migration =
-                allVersions.getMigrationRelationship( currentVersion );
-
-
-        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
-                                                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
-
-
-                  //process the ids in parallel
-                  @Override
-                  public Observable<Long> call(
-                          final Observable<EntityIdScope>
-                                  entityIdScopeObservable ) {
-
-
-                      return entityIdScopeObservable.flatMap(
-                              new Func1<EntityIdScope,
-                                      Observable<EntityToSaveMessage>>() {
-
-
-                                  @Override
-                                  public
-                                  Observable<EntityToSaveMessage> call(
-                                          final EntityIdScope
-                                                  entityIdScope ) {
-
-                                      //load the entity
-                                      final CollectionScope
-                                              currentScope =
-                                              entityIdScope
-                                                      .getCollectionScope();
-
-
-                                      //for each element in our
-                                      // history, we need to copy it
-                                      // to v2.
-                                      // Note that
-                                      // this migration
-                                      //won't support anything beyond V2
-
-                                      final Iterator<MvccEntity>
-                                              allVersions =
-                                              migration.from
-                                                      .loadAscendingHistory(
-                                                              currentScope,
-                                                              entityIdScope
-                                                                      .getId(),
-                                                              startTime,
-                                                              100 );
-
-                                      //emit all the entity versions
-                                      return Observable.create(
-                                              new Observable
-                                                      .OnSubscribe<EntityToSaveMessage>() {
-                                                  @Override
-                                                  public void call(
-                                                          final
-                                                          Subscriber<? super
-                                                                  EntityToSaveMessage> subscriber ) {
-
-                                                      while ( allVersions
-                                                              .hasNext() ) {
-                                                          final
-                                                          EntityToSaveMessage
-                                                                  message =
-                                                                  new EntityToSaveMessage(
-                                                                          currentScope,
-                                                                          allVersions
-                                                                                  .next() );
-                                                          subscriber.onNext( message );
-                                                      }
-
-                                                      subscriber.onCompleted();
-                                                  }
-                                              } );
-                                  }
-                              } )
-
-
-                              //group them by entity id so we can get
-                              // the max for cleanup
-                              .groupBy(
-                                      new Func1<EntityToSaveMessage,
-                                              Id>() {
-                                          @Override
-                                          public Id call(
-                                                  final
-                                                  EntityToSaveMessage
-                                                          entityToSaveMessage ) {
-                                              return entityToSaveMessage.entity
-                                                      .getId();
-                                          }
-                                      } )
-                              //buffer up 10 of groups so we can put them all in a single mutation
-                              .buffer( 10 ).doOnNext(
-                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
-
-
-                                          @Override
-                                          public void call(
-                                                  final
-                                                  List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
-
-                                              atomicLong.addAndGet(
-                                                      groupedObservables
-                                                              .size() );
-
-                                              final MutationBatch
-                                                      totalBatch =
-                                                      keyspace.prepareMutationBatch();
-
-
-                                              //run each of the
-                                              // groups and add
-                                              // it ot the batch
-                                              Observable
-                                                      .from( groupedObservables )
-                                                      //emit the group as an observable
-                                                      .flatMap(
-                                                              new Func1<GroupedObservable<Id, EntityToSaveMessage>, Observable<EntityToSaveMessage>>() {
-
-
-                                                                  @Override
-                                                                  public Observable<EntityToSaveMessage> call(
-                                                                          final GroupedObservable<Id, EntityToSaveMessage> idEntityToSaveMessageGroupedObservable ) {
-                                                                      return idEntityToSaveMessageGroupedObservable
-                                                                              .asObservable();
-                                                                  }
-                                                              } )
-
-                                                      //merge and add the batch
-                                                      .doOnNext(
-                                                              new Action1<EntityToSaveMessage>() {
-                                                                  @Override
-                                                                  public void call(
-                                                                          final EntityToSaveMessage message ) {
-
-                                                                      final MutationBatch
-                                                                              entityRewrite =
-                                                                              migration.to.write( message.scope,
-                                                                                              message.entity );
-
-                                                                      //add to
-                                                                      // the
-                                                                      // total
-                                                                      // batch
-                                                                      totalBatch.mergeShallow( entityRewrite );
-
-                                                                      //write
-                                                                      // the
-                                                                      // unique values
-
-                                                                      if ( !message.entity
-                                                                              .getEntity()
-                                                                              .isPresent() ) {
-                                                                          return;
-                                                                      }
-
-                                                                          final Entity
-                                                                                  entity =
-                                                                                  message.entity
-                                                                                          .getEntity()
-                                                                                          .get();
-
-                                                                          final Id
-                                                                                  entityId =
-                                                                                  entity.getId();
-
-                                                                          final UUID
-                                                                                  version =
-                                                                                  message.entity
-                                                                                          .getVersion();
-
-                                                                          // re-write the unique values
-                                                                          // but this
-                                                                          // time with
-                                                                          // no TTL so that cleanup can clean up older values
-                                                                          for ( Field field : EntityUtils
-                                                                                  .getUniqueFields(
-                                                                                          message.entity
-                                                                                                  .getEntity()
-                                                                                                  .get() ) ) {
-
-                                                                              UniqueValue
-                                                                                      written =
-                                                                                      new UniqueValueImpl(
-                                                                                              field,
-                                                                                              entityId,
-                                                                                              version );
-
-                                                                              MutationBatch
-                                                                                      mb =
-                                                                                      uniqueValueSerializationStrategy
-                                                                                              .write( message.scope,
-                                                                                                      written );
-
-
-                                                                              // merge into our
-                                                                              // existing mutation
-                                                                              // batch
-                                                                              totalBatch
-                                                                                      .mergeShallow(
-                                                                                              mb );
-                                                                          }
-                                                                  }
-                                                              } )
-                                                      //once we've streamed everything, flush it
-                                                      .doOnCompleted(
-
-                                                              new Action0() {
-                                                                  @Override
-                                                                  public void call() {
-
-                                                                      executeBatch(migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
-                                                                  }
-                                                              } )
-                                                      .toBlocking()
-                                                      .last();
-                                          }
-                                      } ).
-                                      reduce( 0l,
-                                              new Func2<Long, List<GroupedObservable<Id, EntityToSaveMessage>>, Long>() {
-
-                                                  @Override
-                                                  public Long call(
-                                                          final Long aLong,
-                                                          final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
-
-                                                      long newCount =
-                                                              aLong;
-
-                                                      for ( GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
-                                                          newCount +=
-                                                                  group.longCount()
-                                                                       .toBlocking()
-                                                                       .last();
-                                                      }
-
-                                                      return newCount;
-                                                  }
-                                              }
-
-
-                                            );
-                  }}).toBlocking().last();
-
-        //now we update the progress observer
-
-        observer.update( migration.to.getImplementationVersion(), "Finished for this step.  Migrated " + migrated + "entities total. " );
+            allVersions.getMigrationRelationship( currentVersion );
+
+
+        final Observable<List<EntityToSaveMessage>> migrated =
+            migrationDataProvider.getData().subscribeOn( Schedulers.io() ).parallel(
+                new Func1<Observable<EntityIdScope>, Observable<List<EntityToSaveMessage>>>() {
+
+
+                    //process the ids in parallel
+                    @Override
+                    public Observable<List<EntityToSaveMessage>> call(
+                        final Observable<EntityIdScope> entityIdScopeObservable ) {
+
+
+                        return entityIdScopeObservable.flatMap(
+                            new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
+
+
+                                @Override
+                                public Observable<EntityToSaveMessage> call( final EntityIdScope entityIdScope ) {
+
+                                    //load the entity
+                                    final CollectionScope currentScope = entityIdScope.getCollectionScope();
+
+
+                                    //for each element in our
+                                    // history, we need to copy it
+                                    // to v2.
+                                    // Note that
+                                    // this migration
+                                    //won't support anything beyond V2
+
+                                    final Iterator<MvccEntity> allVersions = migration.from
+                                        .loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
+
+                                    //emit all the entity versions
+                                    return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
+                                            @Override
+                                            public void call( final Subscriber<? super
+                                                EntityToSaveMessage> subscriber ) {
+
+                                                while ( allVersions.hasNext() ) {
+                                                    final EntityToSaveMessage message =  new EntityToSaveMessage( currentScope, allVersions.next() );
+                                                    subscriber.onNext( message );
+                                                }
+
+                                                subscriber.onCompleted();
+                                            }
+                                        } );
+                                }
+                            } )
+                            //buffer 10 versions
+                            .buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
+                                @Override
+                                public void call( final List<EntityToSaveMessage> entities ) {
+
+                                    final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+                                    atomicLong.addAndGet( entities.size() );
+
+                                    List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList(entities.size());
+
+                                    for ( EntityToSaveMessage message : entities ) {
+                                        final MutationBatch entityRewrite =
+                                            migration.to.write( message.scope, message.entity );
+
+                                        //add to
+                                        // the
+                                        // total
+                                        // batch
+                                        totalBatch.mergeShallow( entityRewrite );
+
+                                        //write
+                                        // the
+                                        // unique values
+
+                                        if ( !message.entity.getEntity().isPresent() ) {
+                                            return;
+                                        }
+
+                                        final Entity entity = message.entity.getEntity().get();
+
+                                        final Id entityId = entity.getId();
+
+                                        final UUID version = message.entity.getVersion();
+
+                                        // re-write the unique
+                                        // values
+                                        // but this
+                                        // time with
+                                        // no TTL so that cleanup can clean up
+                                        // older values
+                                        for ( Field field : EntityUtils
+                                            .getUniqueFields( message.entity.getEntity().get() ) ) {
+
+                                            UniqueValue written = new UniqueValueImpl( field, entityId, version );
+
+                                            MutationBatch mb =
+                                                uniqueValueSerializationStrategy.write( message.scope, written );
+
+
+                                            // merge into our
+                                            // existing mutation
+                                            // batch
+                                            totalBatch.mergeShallow( mb );
+                                        }
+
+                                        final EntityVersionCleanupTask task = entityVersionCleanupFactory.getTask( message.scope, message.entity.getId(), version );
+
+                                        entityVersionCleanupTasks.add( task );
+                                    }
+
+                                    executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+
+                                    //now run our cleanup task
+
+                                    for(EntityVersionCleanupTask entityVersionCleanupTask: entityVersionCleanupTasks){
+                                        try {
+                                            entityVersionCleanupTask.call();
+                                        }
+                                        catch ( Exception e ) {
+                                            LOGGER.error( "Unable to run cleanup task", e );
+                                        }
+                                    }
+                                }
+                            } );
+                    }
+                } );
+
+        migrated.toBlocking().lastOrDefault(null);
 
         return migration.to.getImplementationVersion();
     }
 
 
-    protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+    protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po,
+                                 final AtomicLong count ) {
         try {
             batch.execute();
 
-            po.update(targetVersion, "Finished copying " + count + " entities to the new format" );
+            po.update( targetVersion, "Finished copying " + count + " entities to the new format" );
         }
         catch ( ConnectionException e ) {
             po.failed( targetVersion, "Failed to execute mutation in cassandra" );
@@ -382,8 +270,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
     }
 
 
-
-
     private static final class EntityToSaveMessage {
         private final CollectionScope scope;
         private final MvccEntity entity;
@@ -394,4 +280,5 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
             this.entity = entity;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
new file mode 100644
index 0000000..9894f07
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.UUID;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.CollectionDataVersions;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV1Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
+import org.apache.usergrid.persistence.core.guice.DataMigrationResetRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import rx.Observable;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@NotThreadSafe
+@RunWith( ITRunner.class )
+@UseModules( { TestCollectionModule.class } )
+public abstract class AbstractMvccEntityDataMigrationV1ToV3ImplTest implements DataMigrationResetRule.DataMigrationManagerProvider {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    public DataMigrationManager dataMigrationManager;
+
+
+    @Inject
+    public MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl;
+
+
+    @Inject
+    public VersionedMigrationSet<MvccEntitySerializationStrategy> versions;
+
+    /**
+     * Rule to do the resets we need
+     */
+    @Rule
+    public DataMigrationResetRule migrationTestRule =
+        new DataMigrationResetRule( this, CollectionMigrationPlugin.PLUGIN_NAME,
+            CollectionDataVersions.INITIAL.getVersion() );
+
+
+    @Test
+    public void testMigration() throws ConnectionException {
+
+        final Id applicationId = createId( "application" );
+        final String collectionName = "things";
+
+        CollectionScope scope = new CollectionScopeImpl( applicationId, applicationId, collectionName );
+
+        final MvccEntity entity1 = getEntity( "thing" );
+        final MvccEntity entity2 = getEntity( "thing" );
+
+
+        MvccEntitySerializationStrategy v1Impl = getExpectedSourceImpl();
+
+        MvccEntitySerializationStrategy v3Impl = getExpectedTargetImpl();
+
+
+        v1Impl.write( scope, entity1 ).execute();
+        v1Impl.write( scope, entity2 ).execute();
+
+
+        MvccEntity returned1 = v1Impl.load( scope, entity1.getId() ).get();
+        MvccEntity returned2 = v1Impl.load( scope, entity2.getId() ).get();
+
+        assertEquals( "Same entity", entity1, returned1 );
+        assertEquals( "Same entity", entity2, returned2 );
+
+        final Observable<EntityIdScope> entityIdScope =
+            Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+
+
+        final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {
+            @Override
+            public Observable<EntityIdScope> getData() {
+                return entityIdScope;
+            }
+        };
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+        final CollectionDataVersions startVersion = getSourceVersion();
+
+        final MigrationRelationship<MvccEntitySerializationStrategy> tuple =
+                  versions.getMigrationRelationship( startVersion.getVersion() );
+
+
+        assertEquals( "Same instance for from", v1Impl.getClass(), tuple.from.getClass() );
+        assertEquals( "Same instance for to", v3Impl.getClass(), tuple.to.getClass() );
+
+        //now migration
+        final int newVersion = mvccEntityDataMigrationImpl
+            .migrate( startVersion.getVersion(), migrationProvider, progressObserver );
+
+
+        final CollectionDataVersions expectedVersion = expectedTargetVersion();
+
+        assertEquals( "Correct version returned", newVersion, expectedVersion.getVersion() );
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+        //now verify we can read the data correctly in the new version
+        returned1 = v3Impl.load( scope, entity1.getId() ).get();
+        returned2 = v3Impl.load( scope, entity2.getId() ).get();
+
+        assertEquals( "Same entity", entity1, returned1 );
+        assertEquals( "Same entity", entity2, returned2 );
+
+        //verify the tuple is correct
+
+        final MigrationRelationship<MvccEntitySerializationStrategy> newTuple =
+            versions.getMigrationRelationship( newVersion );
+
+
+        assertEquals( "Same instance for from", v3Impl.getClass(), newTuple.from.getClass() );
+        assertEquals( "Same instance for to", v3Impl.getClass(), newTuple.to.getClass() );
+    }
+
+
+    private MvccEntity getEntity( final String type ) {
+
+        final SimpleId entityId = new SimpleId( type );
+        final UUID version = UUIDGenerator.newTimeUUID();
+        final Entity entity = new Entity( entityId );
+
+        MvccEntityImpl logEntry = new MvccEntityImpl( entityId, version, MvccEntity.Status.COMPLETE, entity );
+
+
+        return logEntry;
+    }
+
+
+    @Override
+    public DataMigrationManager getDataMigrationManager() {
+        return dataMigrationManager;
+    }
+
+
+    /**
+     * Get the expected source mvcc implementation for this test
+     * @return
+     */
+    protected abstract MvccEntitySerializationStrategy getExpectedSourceImpl();
+
+    /**
+     * Get the expected target mvcc for this test
+     * @return
+     */
+    protected abstract MvccEntitySerializationStrategy getExpectedTargetImpl();
+
+    /**
+     * Get the expected start version
+     * @return
+     */
+    protected abstract CollectionDataVersions getSourceVersion();
+
+    /**
+     *
+     * @return
+     */
+    protected abstract CollectionDataVersions expectedTargetVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
index dde67cb..f333fba 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.collection.serialization.impl.CollectionD
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV1Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
 import org.apache.usergrid.persistence.core.guice.DataMigrationResetRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
@@ -65,7 +66,13 @@ import static org.junit.Assert.assertTrue;
 @NotThreadSafe
 @RunWith( ITRunner.class )
 @UseModules( { TestCollectionModule.class } )
-public class MvccEntityDataMigrationV1ToV3ImplTest implements DataMigrationResetRule.DataMigrationManagerProvider {
+public class MvccEntityDataMigrationV1ToV3ImplTest extends AbstractMvccEntityDataMigrationV1ToV3ImplTest{
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
 
     @Inject
     public DataMigrationManager dataMigrationManager;
@@ -79,101 +86,38 @@ public class MvccEntityDataMigrationV1ToV3ImplTest implements DataMigrationReset
     @Inject
     public MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl;
 
+
     @Inject
     public VersionedMigrationSet<MvccEntitySerializationStrategy> versions;
 
-    /**
-     * Rule to do the resets we need
-     */
-    @Rule
-    public DataMigrationResetRule migrationTestRule =
-        new DataMigrationResetRule( this, CollectionMigrationPlugin.PLUGIN_NAME,
-            CollectionDataVersions.INITIAL.getVersion() );
-
-
-    @Test
-    public void testMigration() throws ConnectionException {
-
-        final Id applicationId = createId("application");
-        final String collectionName = "things";
-
-        CollectionScope scope = new CollectionScopeImpl(applicationId, applicationId, collectionName );
-
-        final MvccEntity entity1 = getEntity( "thing" );
-        final MvccEntity entity2 = getEntity( "thing" );
-
-        v1Impl.write( scope, entity1 ).execute();
-        v1Impl.write( scope, entity2 ).execute();
-
-
-        MvccEntity returned1 = v1Impl.load( scope, entity1.getId() ).get();
-        MvccEntity returned2 = v1Impl.load( scope, entity2.getId() ).get();
-
-        assertEquals("Same entity", entity1, returned1);
-        assertEquals("Same entity", entity2, returned2);
-
-        final Observable<EntityIdScope> entityIdScope = Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
-
-
-        final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {
-            @Override
-            public Observable<EntityIdScope> getData() {
-                return entityIdScope;
-            }
-        };
-
-        final TestProgressObserver progressObserver = new TestProgressObserver();
-
-        //now migration
-        final int newVersion = mvccEntityDataMigrationImpl.migrate( CollectionDataVersions.INITIAL.getVersion(), migrationProvider, progressObserver  );
-
-
-        assertEquals( "Correct version returned", newVersion, CollectionDataVersions.LOG_REMOVAL.getVersion() );
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
-        //now verify we can read the data correctly in the new version
-        returned1 = v3Impl.load( scope, entity1.getId() ).get();
-           returned2 = v3Impl.load( scope, entity2.getId() ).get();
-
-           assertEquals("Same entity", entity1, returned1);
-           assertEquals("Same entity", entity2, returned2);
-
-        //verify the tuple is correct
-
-        final MigrationRelationship<MvccEntitySerializationStrategy>
-            tuple = versions.getMigrationRelationship( newVersion );
-
-
-        assertSame("Same instance for from", v1Impl, tuple.from);
-        assertSame("Same instance for to", v3Impl, tuple.to);
-
 
 
+    @Override
+    public DataMigrationManager getDataMigrationManager() {
+        return dataMigrationManager;
     }
 
 
-    private MvccEntity getEntity(final String type){
-
-        final SimpleId entityId = new SimpleId( type );
-        final UUID version = UUIDGenerator.newTimeUUID();
-        final Entity entity = new Entity( entityId );
-
-        MvccEntityImpl logEntry = new MvccEntityImpl( entityId, version, MvccEntity.Status.COMPLETE, entity );
-
-
-        return logEntry;
+    @Override
+    protected MvccEntitySerializationStrategy getExpectedSourceImpl() {
+        return v1Impl;
+    }
 
 
+    @Override
+    protected MvccEntitySerializationStrategy getExpectedTargetImpl() {
+        return v3Impl;
     }
 
 
-
+    @Override
+    protected CollectionDataVersions getSourceVersion() {
+        return CollectionDataVersions.INITIAL;
+    }
 
 
     @Override
-    public DataMigrationManager getDataMigrationManager() {
-        return dataMigrationManager;
+    protected CollectionDataVersions expectedTargetVersion() {
+        return CollectionDataVersions.LOG_REMOVAL;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV2ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV2ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV2ToV3ImplTest.java
new file mode 100644
index 0000000..165105d
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV2ToV3ImplTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.CollectionDataVersions;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV1Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV2Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+
+@NotThreadSafe
+@RunWith( ITRunner.class )
+@UseModules( { TestCollectionModule.class } )
+public class MvccEntityDataMigrationV2ToV3ImplTest extends AbstractMvccEntityDataMigrationV1ToV3ImplTest{
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    public DataMigrationManager dataMigrationManager;
+
+    @Inject
+    private MvccEntitySerializationStrategyV2Impl v2Impl;
+
+    @Inject
+    private MvccEntitySerializationStrategyV3Impl v3Impl;
+
+    @Inject
+    public MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl;
+
+
+    @Inject
+    public VersionedMigrationSet<MvccEntitySerializationStrategy> versions;
+
+
+
+    @Override
+    public DataMigrationManager getDataMigrationManager() {
+        return dataMigrationManager;
+    }
+
+
+    @Override
+    protected MvccEntitySerializationStrategy getExpectedSourceImpl() {
+        return v2Impl;
+    }
+
+
+    @Override
+    protected MvccEntitySerializationStrategy getExpectedTargetImpl() {
+        return v3Impl;
+    }
+
+
+    @Override
+    protected CollectionDataVersions getSourceVersion() {
+        return CollectionDataVersions.BUFFER_SHORT_FIX;
+    }
+
+
+    @Override
+    protected CollectionDataVersions expectedTargetVersion() {
+        return CollectionDataVersions.LOG_REMOVAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48127152/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
index cba88aa..ac6c169 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
@@ -27,6 +27,13 @@ public class MigrationManagerRule extends ExternalResource {
     @Inject
     public void setMigrationManager( final MigrationManager migrationManager )  {
         this.migrationManager = migrationManager;
+
+        try {
+                   this.migrationManager.migrate();
+               }
+               catch ( MigrationException e ) {
+                   throw new RuntimeException(e);
+               }
     }
 
     @Inject