You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/10 19:09:01 UTC

[4/5] incubator-usergrid git commit: added new entity object for serialization

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index af8bbed..2509771 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -24,6 +24,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -32,7 +37,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
@@ -40,9 +45,7 @@ import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
@@ -66,8 +69,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
     private Injector injector;
 
 
-    private EntityDataMigration entityDataMigration;
-    private ManagerCache managerCache;
+    private DataMigration entityDataMigration;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
     private MvccEntitySerializationStrategy v1Strategy;
@@ -80,19 +82,21 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = 
-            new MigrationTestRule( app, CpSetup.getInjector() ,EntityDataMigration.class  );
+    public MigrationTestRule migrationTestRule =
+            new MigrationTestRule( app, CpSetup.getInjector() ,MvccEntitySerializationStrategyProxyV2Impl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     @Before
     public void setup() {
         emf = setup.getEmf();
         injector = CpSetup.getInjector();
-        entityDataMigration = injector.getInstance( EntityDataMigration.class );
-        managerCache = injector.getInstance( ManagerCache.class );
+        entityDataMigration = injector.getInstance( MvccEntitySerializationStrategyProxyV2Impl.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-        v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
-        v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+        MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
+        v1Strategy = strategy.getMigration().from();
+        v2Strategy = strategy.getMigration().to();
     }
 
 
@@ -131,11 +135,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         //read everything in previous version format and put it into our types.  Assumes we're
         //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
 
                     //add all versions from history to our comparison
                     for ( final Id id : entity.entityIds ) {
@@ -167,7 +171,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         assertTrue( "Saved new entities", savedEntities.size() > 0 );
 
         //perform the migration
-        entityDataMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(ApplicationEntityGroup applicationEntityGroup) {
+                   try {
+                       entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
+                   }catch (Throwable e){
+                       throw new RuntimeException(e);
+                   }
+                }
+            }).toBlocking().last();
+
 
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -183,12 +198,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
 
         //now visit all entities in the system again, load them from v2, and ensure they're the same
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable
-                                .ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
                     //add all versions from history to our comparison
                     for ( final Id id : entity.entityIds ) {
 
@@ -218,16 +232,15 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         assertEquals( "All entities migrated", 0, savedEntities.size() );
 
 
-        //now visit all entities in the system again, and load them from the EM, 
+        //now visit all entities in the system again, and load them from the EM,
         // ensure we see everything we did in the v1 traversal
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable
-                                .ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
 
-                    final EntityManager em = emf.getEntityManager( 
+                    final EntityManager em = emf.getEntityManager(
                             entity.applicationScope.getApplication().getUuid() );
 
                     //add all versions from history to our comparison

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index 266fb17..3da0b85 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -23,6 +23,8 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,7 +33,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
@@ -69,9 +71,9 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( 
+    public MigrationTestRule migrationTestRule = new MigrationTestRule(
             app, CpSetup.getInjector() ,EntityTypeMappingMigration.class  );
-
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
@@ -82,6 +84,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         keyspace = injector.getInstance( Keyspace.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -114,62 +117,69 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         keyspace.truncateColumnFamily( MapSerializationImpl.MAP_ENTRIES );
         keyspace.truncateColumnFamily( MapSerializationImpl.MAP_KEYS );
 
-        app.createApplication( 
-                GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(), 
+        app.createApplication(
+                GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(),
                 "migrationTest" );
 
 
 
         final TestProgressObserver progressObserver = new TestProgressObserver();
 
-        entityTypeMappingMigration.migrate( progressObserver );
-
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(final ApplicationEntityGroup entity) {
+                    try {
+                        entityTypeMappingMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e ){
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                    final ApplicationEntityGroup entity) {
                     //ensure that each one has a type
 
                     final EntityManager em = emf.getEntityManager(
-                            entity.applicationScope.getApplication().getUuid() );
+                        entity.applicationScope.getApplication().getUuid());
 
-                    for ( final Id id : entity.entityIds ) {
+                    for (final Id id : entity.entityIds) {
                         try {
-                            final Entity returned = em.get( id.getUuid() );
+                            final Entity returned = em.get(id.getUuid());
 
                             //we seem to occasionally get phantom edges.  If this is the
                             // case we'll store the type _> uuid mapping, but we won't have
                             // anything to load
 
-                            if ( returned != null ) {
-                                assertEquals( id.getUuid(), returned.getUuid() );
-                                assertEquals( id.getType(), returned.getType() );
-                            }
-                            else {
-                                final String type = managerCache.getMapManager( CpNamingUtils
-                                        .getEntityTypeMapScope(
-                                                entity.applicationScope.getApplication() ) )
-                                                                .getString( id.getUuid()
-                                                                            .toString() );
-
-                                assertEquals( id.getType(), type );
+                            if (returned != null) {
+                                assertEquals(id.getUuid(), returned.getUuid());
+                                assertEquals(id.getType(), returned.getType());
+                            } else {
+                                final String type = managerCache.getMapManager(CpNamingUtils
+                                    .getEntityTypeMapScope(
+                                        entity.applicationScope.getApplication()))
+                                    .getString(id.getUuid()
+                                        .toString());
+
+                                assertEquals(id.getType(), type);
                             }
-                        }
-                        catch ( Exception e ) {
-                            throw new RuntimeException( "Unable to get entity " + id
-                                    + " by UUID, migration failed", e );
-                        }
+                        } catch (Exception e) {
+                            throw new RuntimeException("Unable to get entity " + id
+                                + " by UUID, migration failed", e);
+                                    }
 
-                        allEntities.remove( id );
-                    }
-                }
-            } ).toBlocking().lastOrDefault( null );
+                                    allEntities.remove(id);
+                                }
+                            }
+                        }).toBlocking().lastOrDefault(null);
 
 
-        assertEquals( "Every element should have been encountered", 0, allEntities.size() );
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-    }
-}
+                    assertEquals("Every element should have been encountered", 0, allEntities.size());
+                    assertFalse("Progress observer should not have failed", progressObserver.getFailed());
+                    assertTrue("Progress observer should have update messages", progressObserver.getUpdates().size() > 0);
+                }
+            }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index f287047..2d6d0d9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -23,6 +23,10 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,7 +35,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
@@ -55,7 +59,7 @@ import org.junit.Ignore;
 public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
     private Injector injector;
-    private GraphShardVersionMigration graphShardVersionMigration;
+    private DataMigration graphShardVersionMigration;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,18 +69,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,GraphShardVersionMigration.class  );
-
+    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,EdgeMetadataSerializationProxyImpl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
     public void setup() {
         injector = CpSetup.getInjector();
-        graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+        graphShardVersionMigration = injector.getInstance( EdgeMetadataSerializationProxyImpl.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -114,11 +118,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+                                     .doOnNext( new Action1<ApplicationEntityGroup>() {
                                          @Override
                                          public void call(
-                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                                                 final ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
@@ -154,7 +158,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //perform the migration
-        graphShardVersionMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(
+                    final ApplicationEntityGroup entity) {
+                    try {
+                        graphShardVersionMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e){
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).toBlocking().last();
 
         assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
@@ -171,12 +186,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+                                     .doOnNext( new Action1<ApplicationEntityGroup>() {
                                                     @Override
                                                     public void call(
-                                                            final AllEntitiesInSystemObservable
-                                                                    .ApplicationEntityGroup entity ) {
+                                                            final ApplicationEntityGroup entity ) {
 
                                                         final GraphManager gm =
                                                                 managerCache.getGraphManager( entity.applicationScope );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 4d1c6c9..30c9ac3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,6 +23,11 @@ package org.apache.usergrid.corepersistence.rx;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +60,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1thing";
@@ -92,9 +100,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
             @Override
-            public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+            public void call( final ApplicationEntityGroup entity ) {
 
                 assertNotNull(entity);
                 assertNotNull(entity.applicationScope);
@@ -125,7 +133,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index f8f3c50..7c902ea 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,21 +24,19 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
 import rx.functions.Action1;
 
-import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 
 
@@ -52,6 +50,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
 
         final Application createdApplication = app.getEntityManager().getApplication();
 
+        ApplicationObservable applicationObservable = CpSetup.getInjector().getInstance(ApplicationObservable.class);
 
         //now our get all apps we expect.  There may be more, but we don't care about those.
         final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -66,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
         //clean up our wiring
         ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
 
-        Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds( managerCache );
+        Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
 
         appObservable.doOnNext( new Action1<Id>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 2aa7fbc..2564fe5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -60,6 +61,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesToTargetObservable edgesToTargetObservable = CpSetup.getInjector().getInstance(EdgesToTargetObservable.class);
         final EntityManager em = app.getEntityManager();
         final Application createdApplication = em.getApplication();
 
@@ -96,7 +98,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index ef0d953..5d25f62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesFromSourceObservable edgesFromSourceObservable=  CpSetup.getInjector().getInstance(EdgesFromSourceObservable.class);
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1things";
@@ -92,7 +94,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();
@@ -124,7 +126,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        EdgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index cde8866..e5b0319 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
 
@@ -93,7 +96,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        TargetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 
@@ -116,7 +119,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 4de18fe..90cade0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection;
 
 
 import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,7 +32,7 @@ import rx.Observable;
 /**
  * The operations for performing changes on an entity
  */
-public interface EntityCollectionManager {
+public interface EntityCollectionManager extends CPManager {
 
     /**
      * Write the entity in the entity collection.  This is an entire entity, it's contents will
@@ -68,12 +70,12 @@ public interface EntityCollectionManager {
 
     /**
      * Takes the change and reloads an entity with all changes applied in this entity applied.
-     * The resulting entity from calling load will be the previous version of this entity plus 
+     * The resulting entity from calling load will be the previous version of this entity plus
      * the entity in this object applied to it.
      */
     public Observable<Entity> update ( Entity entity );
 
-    /** 
+    /**
      * Returns health of entity data store.
      */
     public Health getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index ef579f8..9140913 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -20,40 +20,42 @@ package org.apache.usergrid.persistence.collection;
 
 
 /**
- * A basic factory that creates a collection manager with the given context. 
+ * A basic factory that creates a collection manager with the given context.
  * Each instance of this factory should exist for a Single ApplicationScope
  */
 public interface EntityCollectionManagerFactory {
 
     /**
-     * Create a new EntityCollectionManager for the given context. 
-     * The EntityCollectionManager can safely be used on the current thread 
+     * Create a new EntityCollectionManager for the given context.
+     * The EntityCollectionManager can safely be used on the current thread
      * and will shard responses.  The returned instance should not be shared
      * among threads it will not be guaranteed to be thread safe.
      *
-     * @param collectionScope The collectionScope collectionScope to use 
+     * @param collectionScope The collectionScope collectionScope to use
      * when creating the collectionScope manager
      *
      * @return The collectionScope manager to perform operations within the provided context
      */
-    public EntityCollectionManager 
+    public EntityCollectionManager
         createCollectionManager( CollectionScope collectionScope );
 
 
 
     /**
-     * Create a new EntityCollectionManagerSync for the given context. 
-     * The EntityCollectionManager can safely be used on the current thread 
+     * Create a new EntityCollectionManagerSync for the given context.
+     * The EntityCollectionManager can safely be used on the current thread
      * and will shard responses.  The returned instance should not be shared
-     * among threads it will not be guaranteed to be thread safe.  
+     * among threads it will not be guaranteed to be thread safe.
      * This implementation will be synchronous. Try to use the org.apache.usergrid.persistence.core.consistency
      * implementation if possible
      *
-     * @param collectionScope The collectionScope collectionScope to use when 
+     * @param collectionScope The collectionScope collectionScope to use when
      * creating the collectionScope manager
      *
      * @return The collectionScope manager to perform operations within the provided context
      */
-    public EntityCollectionManagerSync 
+    public EntityCollectionManagerSync
         createCollectionManagerSync( CollectionScope collectionScope );
+
+    void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 1c3e258..c5dd961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.collection.guice;
 
 
 
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -81,10 +82,7 @@ public class CollectionModule extends AbstractModule {
         Multibinder.newSetBinder( binder(), EntityDeleted.class );
 
         // create a guice factor for getting our collection manager
-        install( new FactoryModuleBuilder()
-            .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
-            .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
-            .build( EntityCollectionManagerFactory.class ) );
+       bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
@@ -116,7 +114,7 @@ public class CollectionModule extends AbstractModule {
     @Provides
     @CollectionTaskExecutor
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", 
+        return new NamedTaskExecutorImpl( "collectiontasks",
                 serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
new file mode 100644
index 0000000..790be19
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -0,0 +1,137 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import org.apache.usergrid.persistence.collection.*;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.*;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityCollectionManagerFactoryImpl implements EntityCollectionManagerFactory {
+
+
+    private final WriteStart writeStart;
+    private final WriteStart writeUpdate;
+    private final WriteUniqueVerify writeVerifyUnique;
+    private final WriteOptimisticVerify writeOptimisticVerify;
+    private final WriteCommit writeCommit;
+    private final RollbackAction rollback;
+    private final MarkStart markStart;
+    private final MarkCommit markCommit;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+    private final Keyspace keyspace;
+    private final SerializationFig config;
+    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
+    private final EntityDeletedFactory entityDeletedFactory;
+    private final TaskExecutor taskExecutor;
+    private final CollectionScope collectionScope;
+    private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
+        CacheBuilder.newBuilder().maximumSize( 1000 )
+            .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
+                public EntityCollectionManager load( CollectionScope scope ) {
+                    return new EntityCollectionManagerImpl(
+                        writeStart,
+                        writeUpdate,
+                        writeVerifyUnique,
+                        writeOptimisticVerify,writeCommit,rollback,markStart,markCommit,entitySerializationStrategy,uniqueValueSerializationStrategy,mvccLogEntrySerializationStrategy,keyspace,config,entityVersionCleanupFactory,entityVersionCreatedFactory,entityDeletedFactory,taskExecutor,collectionScope);
+                }
+            } );
+
+
+
+    public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
+                                               @WriteUpdate final WriteStart              writeUpdate,
+                                               final WriteUniqueVerify writeVerifyUnique,
+                                               final WriteOptimisticVerify writeOptimisticVerify,
+                                               final WriteCommit writeCommit,
+                                               final RollbackAction rollback,
+                                               final MarkStart markStart,
+                                               final MarkCommit markCommit,
+                                               @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                               final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                               final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+                                               final Keyspace keyspace,
+                                               final SerializationFig config,
+                                               final EntityVersionCleanupFactory entityVersionCleanupFactory,
+                                               final EntityVersionCreatedFactory          entityVersionCreatedFactory,
+                                               final EntityDeletedFactory                 entityDeletedFactory,
+                                               @CollectionTaskExecutor final TaskExecutor taskExecutor,
+                                               @Assisted final CollectionScope            collectionScope){
+
+        this.writeStart = writeStart;
+        this.writeUpdate = writeUpdate;
+        this.writeVerifyUnique = writeVerifyUnique;
+        this.writeOptimisticVerify = writeOptimisticVerify;
+        this.writeCommit = writeCommit;
+        this.rollback = rollback;
+        this.markStart = markStart;
+        this.markCommit = markCommit;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+        this.keyspace = keyspace;
+        this.config = config;
+        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+        this.entityDeletedFactory = entityDeletedFactory;
+        this.taskExecutor = taskExecutor;
+        this.collectionScope = collectionScope;
+    }
+    @Override
+    public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
+        try{
+            return ecmCache.get(collectionScope);
+        }catch (ExecutionException ee){
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public EntityCollectionManagerSync createCollectionManagerSync(CollectionScope collectionScope) {
+        return new EntityCollectionManagerSyncImpl(this,collectionScope);
+    }
+
+    @Override
+    public void invalidate() {
+        ecmCache.invalidateAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index a89924a..11c5d44 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -33,7 +33,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -117,19 +117,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     @Inject
-    public EntityCollectionManagerImpl( 
-        @Write final WriteStart                    writeStart, 
+    public EntityCollectionManagerImpl(
+        @Write final WriteStart                    writeStart,
         @WriteUpdate final WriteStart              writeUpdate,
         final WriteUniqueVerify                    writeVerifyUnique,
         final WriteOptimisticVerify                writeOptimisticVerify,
-        final WriteCommit                          writeCommit, 
+        final WriteCommit                          writeCommit,
         final RollbackAction                       rollback,
-        final MarkStart                            markStart, 
+        final MarkStart                            markStart,
         final MarkCommit                           markCommit,
         @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
         final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
         final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
-        final Keyspace                             keyspace, 
+        final Keyspace                             keyspace,
         final SerializationFig                     config,
         final EntityVersionCleanupFactory          entityVersionCleanupFactory,
         final EntityVersionCreatedFactory          entityVersionCreatedFactory,
@@ -182,9 +182,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
-        // execute all validation stages concurrently.  Needs refactored when this is done.  
+        // execute all validation stages concurrently.  Needs refactored when this is done.
         // https://github.com/Netflix/RxJava/issues/627
-        // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
+        // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
         //                  writeVerifyUnique, writeOptimisticVerify );
 
         return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
@@ -402,4 +402,4 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         return Health.RED;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 9ff4f56..f7d5b58 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -24,7 +24,7 @@ import com.netflix.astyanax.MutationBatch;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -56,13 +56,13 @@ public class EntityDeletedTask implements Task<Void> {
 
 
     @Inject
-    public EntityDeletedTask( 
+    public EntityDeletedTask(
         EntityVersionCleanupFactory             entityVersionCleanupFactory,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
         final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final CollectionScope         collectionScope, 
-        @Assisted final Id                      entityId, 
+        @Assisted final CollectionScope         collectionScope,
+        @Assisted final Id                      entityId,
         @Assisted final UUID                    version) {
 
         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
@@ -81,7 +81,7 @@ public class EntityDeletedTask implements Task<Void> {
                 new Object[] { collectionScope, entityId, version }, throwable );
     }
 
-    
+
     @Override
     public Void rejected() {
         try {
@@ -94,9 +94,9 @@ public class EntityDeletedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
-    public Void call() throws Exception { 
+    public Void call() throws Exception {
 
         entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index efecdeb..2f51eb5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
@@ -55,7 +55,7 @@ import rx.schedulers.Schedulers;
 
 
 /**
- * Cleans up previous versions from the specified version. Note that this means the version 
+ * Cleans up previous versions from the specified version. Note that this means the version
  * passed in the io event is retained, the range is exclusive.
  */
 public class EntityVersionCleanupTask implements Task<Void> {
@@ -77,7 +77,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
 
     @Inject
-    public EntityVersionCleanupTask( 
+    public EntityVersionCleanupTask(
         final SerializationFig serializationFig,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                                 continue;
                             }
                             final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
-                            final MutationBatch deleteMutation = 
+                            final MutationBatch deleteMutation =
                                     uniqueValueSerializationStrategy.delete(scope,unique);
                             batch.mergeShallow(deleteMutation);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
new file mode 100644
index 0000000..0a3cd3a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.mvcc;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+
+/**
+ * Classy class class.
+ */
+public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
deleted file mode 100644
index 8a13115..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc;
-
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * The interface that allows us to serialize an entity to disk
- */
-public interface MvccEntitySerializationStrategy extends Migration {
-
-    /**
-     * Serialize the entity to the data store with the given collection context
-     *
-     * @param entity The entity to persist
-     *
-     * @return The MutationBatch operations for this update
-     */
-    public MutationBatch write( CollectionScope context, MvccEntity entity );
-
-
-
-    /**
-     * Load the entities into the entitySet from the specified Ids.  Loads versions <= the maxVersion
-     * @param scope
-     * @param entityIds
-     * @return
-     */
-    public EntitySet load( CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
-
-    /**
-     * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
-     *
-     * @param context The context to persist the entity into
-     * @param entityId The entity id to load
-     * @param version The max version to seek from.  I.E a stored version <= this argument
-     * @param fetchSize The fetch size to return for each trip to cassandra.
-     *
-     * @return An iterator of entities ordered from max(UUID)=> min(UUID).  The return value should be null
-     *         safe and return an empty list when there are no matches
-     */
-    public Iterator<MvccEntity> loadDescendingHistory( CollectionScope context, Id entityId, UUID version,
-                                                       int fetchSize );
-
-    /**
-     * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
-     *
-     * @param context The context to persist the entity into
-     * @param entityId The entity id to load
-     * @param version The max version to seek to.  I.E a stored version < this argument
-     * @param fetchSize The fetch size to return for each trip to cassandra.
-     * @return An iterator of entities ordered from min(UUID)=> max(UUID).  The return value should be null
-     *         safe and return an empty list when there are no matches
-     */
-    public Iterator<MvccEntity> loadAscendingHistory( CollectionScope context, Id entityId, UUID version,
-                                                      int fetchSize );
-
-    /**
-     * Mark this  this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
-     * can be used in a mark+sweep system.  The entity with the given version will exist in the context, but no data
-     * will be stored
-     */
-    public MutationBatch mark( CollectionScope context, Id entityId, UUID version );
-
-
-    /**
-     * Delete the entity from the context with the given entityId and version
-     *
-     * @param context The context that contains the entity
-     * @param entityId The entity id to delete
-     * @param version The version to delete
-     */
-    public MutationBatch delete( CollectionScope context, Id entityId, UUID version );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index baf2ac3..380bf15 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index d3c8193..8604af6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.WriteCommitException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
@@ -53,7 +53,7 @@ import rx.functions.Func1;
 
 
 /**
- * This phase should invoke any finalization, and mark the entity as committed in the 
+ * This phase should invoke any finalization, and mark the entity as committed in the
  * data store before returning
  */
 @Singleton

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
index 4226fe6..1c7909b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLog;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
new file mode 100644
index 0000000..bf1422b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * The interface that allows us to serialize an entity to disk
+ */
+public interface MvccEntitySerializationStrategy extends Migration {
+
+    /**
+     * Serialize the entity to the data store with the given collection context
+     *
+     * @param entity The entity to persist
+     * @return The MutationBatch operations for this update
+     */
+    public MutationBatch write(CollectionScope context, MvccEntity entity);
+
+
+    /**
+     * Load the entities into the entitySet from the specified Ids.  Loads versions <= the maxVersion
+     *
+     * @param scope
+     * @param entityIds
+     * @return
+     */
+    public EntitySet load(CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
+
+    /**
+     * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
+     *
+     * @param context   The context to persist the entity into
+     * @param entityId  The entity id to load
+     * @param version   The max version to seek from.  I.E a stored version <= this argument
+     * @param fetchSize The fetch size to return for each trip to cassandra.
+     * @return An iterator of entities ordered from max(UUID)=> min(UUID).  The return value should be null
+     * safe and return an empty list when there are no matches
+     */
+    public Iterator<MvccEntity> loadDescendingHistory(CollectionScope context, Id entityId, UUID version,
+                                                      int fetchSize);
+
+    /**
+     * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
+     *
+     * @param context   The context to persist the entity into
+     * @param entityId  The entity id to load
+     * @param version   The max version to seek to.  I.E a stored version < this argument
+     * @param fetchSize The fetch size to return for each trip to cassandra.
+     * @return An iterator of entities ordered from min(UUID)=> max(UUID).  The return value should be null
+     * safe and return an empty list when there are no matches
+     */
+    public Iterator<MvccEntity> loadAscendingHistory(CollectionScope context, Id entityId, UUID version,
+                                                     int fetchSize);
+
+    /**
+     * Mark this  this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
+     * can be used in a mark+sweep system.  The entity with the given version will exist in the context, but no data
+     * will be stored
+     */
+    public MutationBatch mark(CollectionScope context, Id entityId, UUID version);
+
+
+    /**
+     * Delete the entity from the context with the given entityId and version
+     *
+     * @param context  The context that contains the entity
+     * @param entityId The entity id to delete
+     * @param version  The version to delete
+     */
+    public MutationBatch delete(CollectionScope context, Id entityId, UUID version);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index 6badbc1..3d36438 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.EntityRepair;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
new file mode 100644
index 0000000..bb192cd
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import rx.Observable;
+import rx.functions.Func1;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
+ * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * it will be available from the new source
+ */
+public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+
+
+    private final DataMigrationManager dataMigrationManager;
+    protected final Keyspace keyspace;
+    protected final MvccEntitySerializationStrategy previous;
+    protected final MvccEntitySerializationStrategy current;
+
+
+    @Inject
+    public MvccEntitySerializationStrategyProxy( final DataMigrationManager dataMigrationManager,
+                                                       final Keyspace keyspace,
+                                                      final MvccEntitySerializationStrategy previous,
+                                                       final MvccEntitySerializationStrategy current) {
+        this.dataMigrationManager = dataMigrationManager;
+        this.keyspace = keyspace;
+        this.previous = previous;
+        this.current = current;
+    }
+
+
+    @Override
+    public MutationBatch write( final CollectionScope context, final MvccEntity entity ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.write( context, entity ) );
+            aggregateBatch.mergeShallow( current.write( context, entity ) );
+
+            return aggregateBatch;
+        }
+
+        return current.write( context, entity );
+    }
+
+
+    @Override
+    public EntitySet load( final CollectionScope scope, final Collection<Id> entityIds, final UUID maxVersion ) {
+        if ( isOldVersion() ) {
+            return previous.load( scope, entityIds, maxVersion );
+        }
+
+        return current.load( scope, entityIds, maxVersion );
+    }
+
+
+    @Override
+    public Iterator<MvccEntity> loadDescendingHistory( final CollectionScope context, final Id entityId,
+                                                       final UUID version, final int fetchSize ) {
+        if ( isOldVersion() ) {
+            return previous.loadDescendingHistory( context, entityId, version, fetchSize );
+        }
+
+        return current.loadDescendingHistory( context, entityId, version, fetchSize );
+    }
+
+
+    @Override
+    public Iterator<MvccEntity> loadAscendingHistory( final CollectionScope context, final Id entityId,
+                                                      final UUID version, final int fetchSize ) {
+        if ( isOldVersion() ) {
+            return previous.loadAscendingHistory( context, entityId, version, fetchSize );
+        }
+
+        return current.loadAscendingHistory( context, entityId, version, fetchSize );
+    }
+
+
+    @Override
+    public MutationBatch mark( final CollectionScope context, final Id entityId, final UUID version ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.mark( context, entityId, version ) );
+            aggregateBatch.mergeShallow( current.mark( context, entityId, version ) );
+
+            return aggregateBatch;
+        }
+
+        return current.mark( context, entityId, version );
+    }
+
+
+    @Override
+    public MutationBatch delete( final CollectionScope context, final Id entityId, final UUID version ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.delete( context, entityId, version ) );
+            aggregateBatch.mergeShallow( current.delete( context, entityId, version ) );
+
+            return aggregateBatch;
+        }
+
+        return current.delete( context, entityId, version );
+    }
+
+    /**
+     * Return true if we're on an old version
+     */
+    private boolean isOldVersion() {
+        return dataMigrationManager.getCurrentVersion() < getVersion();
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
+        final AtomicLong atomicLong = new AtomicLong();
+        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+        final List<Id> entityIds = applicationEntityGroup.entityIds;
+
+        final UUID now = UUIDGenerator.newTimeUUID();
+
+        //go through each entity in the system, and load it's entire
+        // history
+        return Observable.from(entityIds)
+
+            .map(new Func1<Id, Id>() {
+                @Override
+                public Id call(Id entityId) {
+
+                    ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
+
+                    if (!(applicationScope instanceof CollectionScope)) {
+                        throw new IllegalArgumentException("getCollectionScopeFromEntityId must return a collection scope");
+                    }
+
+                    CollectionScope currentScope = (CollectionScope) applicationScope;
+                    MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = getMigration();
+                    //for each element in the history in the previous version,
+                    // copy it to the CF in v2
+                    Iterator<MvccEntity> allVersions = migration.from()
+                        .loadDescendingHistory(currentScope, entityId, now,
+                            1000);
+
+                    while (allVersions.hasNext()) {
+                        final MvccEntity version = allVersions.next();
+
+                        final MutationBatch versionBatch =
+                            migration.to().write(currentScope, version);
+
+                        totalBatch.mergeShallow(versionBatch);
+
+                        if (atomicLong.incrementAndGet() % 50 == 0) {
+                            executeBatch(totalBatch, observer, atomicLong);
+                        }
+                    }
+                    executeBatch(totalBatch, observer, atomicLong);
+                    return entityId;
+                }
+            })
+            .map(new Func1<Id, Long>() {
+                @Override
+                public Long call(Id id) {
+                    executeBatch(totalBatch, observer, atomicLong);
+                    return atomicLong.get();
+                }
+            });
+    }
+
+    protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
+        try {
+            batch.execute();
+
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+        }
+        catch ( ConnectionException e ) {
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            throw new DataMigrationException( "Unable to migrate batches ", e );
+        }
+    }
+
+}
+