You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/20 01:37:50 UTC

[2/3] incubator-usergrid git commit: Removed duplicate MigrationRule

Removed duplicate MigrationRule

Partially refactored entity serialization.

Finished creating tests for refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2bd1c950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2bd1c950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2bd1c950

Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 2bd1c950f6fab093d458e6c3a55fb7fb8ff28e79
Parents: 39b1576
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 19 17:01:04 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 19 17:01:04 2014 -0700

----------------------------------------------------------------------
 .../corepersistence/migration/Versions.java     |   3 +
 .../impl/EntityCollectionManagerImpl.java       | 241 +++++++++----------
 .../mvcc/MvccEntitySerializationStrategy.java   |   3 +-
 .../mvcc/MvccLogEntrySerializationStrategy.java |   3 +-
 .../mvcc/stage/delete/MarkCommit.java           |   3 +-
 .../mvcc/stage/write/WriteCommit.java           |   3 +-
 .../UniqueValueSerializationStrategy.java       |   3 +-
 .../MvccEntitySerializationStrategyImpl.java    | 172 ++-----------
 ...vccEntitySerializationStrategyProxyImpl.java | 162 +++++++++++++
 .../MvccEntitySerializationStrategyV1Impl.java  | 193 +++++++++++++++
 .../MvccEntitySerializationStrategyV2Impl.java  | 194 +++++++++++++++
 .../MvccLogEntrySerializationStrategyImpl.java  |   2 +-
 .../serialization/impl/SerializationModule.java |  21 +-
 .../UniqueValueSerializationStrategyImpl.java   |   2 +-
 .../EntityCollectionManagerFactoryTest.java     |   2 +-
 .../collection/EntityCollectionManagerIT.java   |  74 ++++--
 .../EntityCollectionManagerStressTest.java      |   2 +-
 .../EntityCollectionManagerSyncIT.java          |   2 +-
 .../collection/guice/MigrationManagerRule.java  |  38 ---
 .../collection/guice/TestCollectionModule.java  |  14 ++
 ...niqueValueSerializationStrategyImplTest.java |   2 +-
 .../mvcc/stage/write/WriteUniqueVerifyIT.java   |   2 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   2 +-
 ...MvccEntitySerializationStrategyImplTest.java |  56 ++---
 ...cEntitySerializationStrategyProxyV1Test.java |  77 ++++++
 ...cEntitySerializationStrategyProxyV2Test.java |  75 ++++++
 ...ccEntitySerializationStrategyV1ImplTest.java |  46 ++++
 ...ccEntitySerializationStrategyV2ImplTest.java |  47 ++++
 .../impl/MvccLESSTransientTest.java             |   2 +-
 ...ccLogEntrySerializationStrategyImplTest.java |   2 +-
 .../src/test/resources/log4j.properties         |  21 +-
 .../core/guice/MaxMigrationModule.java          |  39 +++
 .../core/guice/MaxMigrationVersion.java         |  40 +++
 .../core/guice/MigrationManagerRule.java        |  19 +-
 .../persistence/graph/GraphManagerIT.java       |   2 +-
 .../persistence/graph/GraphManagerLoadTest.java |   2 +-
 .../graph/GraphManagerShardingIT.java           |   2 +-
 .../graph/GraphManagerStressTest.java           |   2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |   2 +-
 .../graph/guice/TestGraphModule.java            |  11 +
 .../graph/impl/EdgeDeleteListenerTest.java      |   2 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   2 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |   2 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |   2 +-
 .../EdgeMetadataSerializationTest.java          |   2 +-
 .../EdgeSerializationChopTest.java              |   2 +-
 .../serialization/EdgeSerializationTest.java    |   2 +-
 .../serialization/NodeSerializationTest.java    |   2 +-
 .../impl/shard/EdgeShardSerializationTest.java  |   2 +-
 .../NodeShardCounterSerializationTest.java      |   2 +-
 .../persistence/map/MapManagerTest.java         |   2 +-
 51 files changed, 1205 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index b4fe095..99067b7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -22,6 +22,7 @@
 package org.apache.usergrid.corepersistence.migration;
 
 
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 
 
@@ -39,4 +40,6 @@ public class Versions {
      * Version 2.  Edge meta changes
      */
     public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
+
+    public static final int VERSION_3 = MvccEntitySerializationStrategyProxyImpl.MIGRATION_VERSION;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 919e83b..d54c5f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,23 +23,18 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
@@ -49,9 +44,16 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.common.base.Preconditions;
@@ -59,11 +61,10 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.util.Health;
 
 import rx.Observable;
 import rx.Subscriber;
@@ -73,13 +74,12 @@ import rx.schedulers.Schedulers;
 
 
 /**
- * Simple implementation.  Should perform  writes, delete and load.
- * <p/>
- * TODO: maybe refactor the stage operations into their own classes for clarity and organization?
+ * Simple implementation.  Should perform  writes, delete and load. <p/> TODO: maybe refactor the stage operations into
+ * their own classes for clarity and organization?
  */
 public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
-    private static final Logger logger = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger( EntityCollectionManagerImpl.class );
 
     private final CollectionScope collectionScope;
 
@@ -106,24 +106,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     @Inject
-    public EntityCollectionManagerImpl(
-        @Write final WriteStart writeStart,
-        @WriteUpdate final WriteStart writeUpdate,
-        final WriteUniqueVerify writeVerifyUnique,
-        final WriteOptimisticVerify writeOptimisticVerify,
-        final WriteCommit writeCommit, final RollbackAction rollback,
-        final MarkStart markStart, final MarkCommit markCommit,
-        final MvccEntitySerializationStrategy entitySerializationStrategy,
-        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-        final Keyspace keyspace,
-        final SerializationFig config,
-        @Assisted final CollectionScope collectionScope
-    ) {
+    public EntityCollectionManagerImpl( @Write final WriteStart writeStart, @WriteUpdate final WriteStart writeUpdate,
+                                        final WriteUniqueVerify writeVerifyUnique,
+                                        final WriteOptimisticVerify writeOptimisticVerify,
+                                        final WriteCommit writeCommit, final RollbackAction rollback,
+                                        final MarkStart markStart, final MarkCommit markCommit,
+                                        @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+                                        final Keyspace keyspace, final SerializationFig config,
+                                        @Assisted final CollectionScope collectionScope ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
 
-        MvccValidationUtils.validateCollectionScope(collectionScope);
+        MvccValidationUtils.validateCollectionScope( collectionScope );
 
         this.writeStart = writeStart;
         this.writeUpdate = writeUpdate;
@@ -145,20 +141,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     @Override
-    public Observable<Entity> write(final Entity entity) {
+    public Observable<Entity> write( final Entity entity ) {
 
         //do our input validation
-        Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
         final Id entityId = entity.getId();
 
-        ValidationUtils.verifyIdentity(entityId);
+        ValidationUtils.verifyIdentity( entityId );
 
 
         // create our observable and start the write
-        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+        final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
-        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeStart);
+        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
         // execute all validation stages concurrently.  Needs refactored when this is done.  
         // https://github.com/Netflix/RxJava/issues/627
@@ -166,115 +162,119 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         //                  writeVerifyUnique, writeOptimisticVerify );
 
         // return the commit result.
-        return observable.map(writeCommit).doOnError(rollback);
+        return observable.map( writeCommit ).doOnError( rollback );
     }
 
 
     @Override
-    public Observable<Void> delete(final Id entityId) {
-
-        Preconditions.checkNotNull(entityId, "Entity id is required in this stage");
-        Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage");
-        Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage");
-
-        return Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart)
-            .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() {
-                @Override
-                public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
-                    return null;
-                }
-            });
+    public Observable<Void> delete( final Id entityId ) {
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
+        Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
+
+        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart )
+                         .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() {
+                    @Override
+                    public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+                        return null;
+                    }
+                } );
     }
 
 
     @Override
-    public Observable<Entity> load(final Id entityId) {
+    public Observable<Entity> load( final Id entityId ) {
 
-        Preconditions.checkNotNull(entityId, "Entity id required in the load stage");
-        Preconditions.checkNotNull(entityId.getUuid(), "Entity id uuid required in load stage");
-        Preconditions.checkNotNull(entityId.getType(), "Entity id type required in load stage");
+        Preconditions.checkNotNull( entityId, "Entity id required in the load stage" );
+        Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
+        Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        return load(Collections.singleton(entityId)).map(new Func1<EntitySet, Entity>() {
+        return load( Collections.singleton( entityId ) ).map( new Func1<EntitySet, Entity>() {
             @Override
-            public Entity call(final EntitySet entitySet) {
-                final MvccEntity entity = entitySet.getEntity(entityId);
+            public Entity call( final EntitySet entitySet ) {
+                final MvccEntity entity = entitySet.getEntity( entityId );
 
-                if (entity == null) {
+                if ( entity == null ) {
                     return null;
                 }
 
                 return entity.getEntity().orNull();
             }
-        });
+        } );
     }
 
 
     @Override
-    public Observable<EntitySet> load(final Collection<Id> entityIds) {
+    public Observable<EntitySet> load( final Collection<Id> entityIds ) {
 
-        Preconditions.checkNotNull(entityIds, "entityIds cannot be null");
+        Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
 
 
-        return Observable.create(new Observable.OnSubscribe<EntitySet>() {
+        return Observable.create( new Observable.OnSubscribe<EntitySet>() {
 
             @Override
-            public void call(final Subscriber<? super EntitySet> subscriber) {
+            public void call( final Subscriber<? super EntitySet> subscriber ) {
                 try {
-                    final EntitySet results = entitySerializationStrategy.load(
-                            collectionScope, entityIds, UUIDGenerator.newTimeUUID());
+                    final EntitySet results =
+                            entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
 
-                    subscriber.onNext(results);
+                    subscriber.onNext( results );
                     subscriber.onCompleted();
-                } catch (Exception e) {
-                    subscriber.onError(e);
+                }
+                catch ( Exception e ) {
+                    subscriber.onError( e );
                 }
             }
-        });
+        } );
     }
 
+
     @Override
-    public Observable<Id> getIdField(final Field field) {
-        final List<Field> fields = Collections.singletonList(field);
-        return rx.Observable.from(fields).map(new Func1<Field, Id>() {
+    public Observable<Id> getIdField( final Field field ) {
+        final List<Field> fields = Collections.singletonList( field );
+        return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
             @Override
-            public Id call(Field field) {
+            public Id call( Field field ) {
                 try {
-                    UniqueValueSet set = uniqueValueSerializationStrategy.load(collectionScope, fields);
-                    UniqueValue value = set.getValue(field.getName());
+                    UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields );
+                    UniqueValue value = set.getValue( field.getName() );
                     Id id = value == null ? null : value.getEntityId();
                     return id;
-                } catch (ConnectionException e) {
-                    logger.error("Failed to getIdField", e);
-                    throw new RuntimeException(e);
+                }
+                catch ( ConnectionException e ) {
+                    logger.error( "Failed to getIdField", e );
+                    throw new RuntimeException( e );
                 }
             }
-        });
+        } );
     }
 
+
     @Override
-    public Observable<Entity> update(final Entity entity) {
+    public Observable<Entity> update( final Entity entity ) {
 
-        logger.debug("Starting update process");
+        logger.debug( "Starting update process" );
 
         //do our input validation
-        Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
         final Id entityId = entity.getId();
 
 
-        ValidationUtils.verifyIdentity(entityId);
+        ValidationUtils.verifyIdentity( entityId );
 
         // create our observable and start the write
-        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
 
-        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate);
+        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
 
 
-        return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
+        return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
             @Override
-            public void call(final Entity entity) {
-                logger.debug("sending entity to the queue");
+            public void call( final Entity entity ) {
+                logger.debug( "sending entity to the queue" );
 
                 //we an update, signal the fix
 
@@ -284,57 +284,56 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
             }
-        }).doOnError(rollback);
+        } ).doOnError( rollback );
     }
 
 
     // fire the stages
-    public Observable<CollectionIoEvent<MvccEntity>> stageRunner(CollectionIoEvent<Entity> writeData,
-                                                                 WriteStart writeState) {
+    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
+                                                                  WriteStart writeState ) {
 
-        return Observable.from(writeData).map(writeState).doOnNext(
-                new Action1<CollectionIoEvent<MvccEntity>>() {
+        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
 
-            @Override
-            public void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
+                    @Override
+                    public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
-                Observable<CollectionIoEvent<MvccEntity>> unique =
-                        Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
-                                .doOnNext(writeVerifyUnique);
+                        Observable<CollectionIoEvent<MvccEntity>> unique =
+                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                          .doOnNext( writeVerifyUnique );
 
 
-                // optimistic verification
-                Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                        Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
-                                .doOnNext(writeOptimisticVerify);
+                        // optimistic verification
+                        Observable<CollectionIoEvent<MvccEntity>> optimistic =
+                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                          .doOnNext( writeOptimisticVerify );
 
 
-                //wait for both to finish
-                Observable.merge(unique, optimistic).toBlocking().last();
-            }
-        });
+                        //wait for both to finish
+                        Observable.merge( unique, optimistic ).toBlocking().last();
+                    }
+                } );
     }
 
 
     @Override
-    public Observable<VersionSet> getLatestVersion(final Collection<Id> entityIds) {
+    public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
 
-        return Observable.create(new Observable.OnSubscribe<VersionSet>() {
+        return Observable.create( new Observable.OnSubscribe<VersionSet>() {
 
             @Override
-            public void call(final Subscriber<? super VersionSet> subscriber) {
+            public void call( final Subscriber<? super VersionSet> subscriber ) {
                 try {
                     final VersionSet logEntries = mvccLogEntrySerializationStrategy
-                        .load(collectionScope, entityIds, UUIDGenerator.newTimeUUID());
+                            .load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
 
-                    subscriber.onNext(logEntries);
+                    subscriber.onNext( logEntries );
                     subscriber.onCompleted();
-
-                } catch (Exception e) {
-                    subscriber.onError(e);
+                }
+                catch ( Exception e ) {
+                    subscriber.onError( e );
                 }
             }
-        });
+        } );
     }
 
 
@@ -342,25 +341,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     public Health getHealth() {
 
         try {
-            ColumnFamily<String, String> CF_SYSTEM_LOCAL = new ColumnFamily<String, String>(
-                "system.local", 
-                StringSerializer.get(), 
-                StringSerializer.get(), 
-                StringSerializer.get());
+            ColumnFamily<String, String> CF_SYSTEM_LOCAL =
+                    new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+                            StringSerializer.get() );
 
-            OperationResult<CqlResult<String, String>> result = keyspace.prepareQuery(CF_SYSTEM_LOCAL)
-                .withCql("SELECT now() FROM system.local;")
-                .execute();
+            OperationResult<CqlResult<String, String>> result =
+                    keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
 
             if ( result.getResult().getRows().size() == 1 ) {
                 return Health.GREEN;
             }
-
-        } catch ( ConnectionException ex ) {
-            logger.error("Error connecting to Cassandra", ex);
+        }
+        catch ( ConnectionException ex ) {
+            logger.error( "Error connecting to Cassandra", ex );
         }
 
         return Health.RED;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
index b9277eb..93288af 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.MutationBatch;
@@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * The interface that allows us to serialize an entity to disk
  */
-public interface MvccEntitySerializationStrategy {
+public interface MvccEntitySerializationStrategy extends Migration {
 
     /**
      * Serialize the entity to the data store with the given collection context

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
index 927a60c..4baef84 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.MutationBatch;
@@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * The interface that allows us to serialize a log entry to disk
  */
-public interface MvccLogEntrySerializationStrategy {
+public interface MvccLogEntrySerializationStrategy extends Migration {
 
     /**
      * Serialize the entity to the data store with the given collection context

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9e2d52c..5bcb9f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -72,7 +73,7 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> {
 
     @Inject
     public MarkCommit( final MvccLogEntrySerializationStrategy logStrat,
-                       final MvccEntitySerializationStrategy entityStrat,
+                       @ProxyImpl final MvccEntitySerializationStrategy entityStrat,
                        final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig,
                        final Keyspace keyspace ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 6a17197..d3c8193 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -37,6 +37,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -70,7 +71,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
 
     @Inject
     public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
-                        final MvccEntitySerializationStrategy entryStrat,
+                        @ProxyImpl final MvccEntitySerializationStrategy entryStrat,
                         final UniqueValueSerializationStrategy uniqueValueStrat) {
 
         Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 4ceb407..030d9d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -23,13 +23,14 @@ import java.util.Collection;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.field.Field;
 
 
 /**
  * Reads and writes to UniqueValues column family.
  */
-public interface UniqueValueSerializationStrategy {
+public interface UniqueValueSerializationStrategy extends Migration {
 
     /**
      * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index bbaeb4a..1ec027f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -18,9 +18,7 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -54,21 +52,15 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import com.netflix.astyanax.ColumnListMutation;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.model.Composites;
 import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.astyanax.serializers.AbstractSerializer;
@@ -80,18 +72,13 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
 /**
  * @author tnine
  */
-@Singleton
-public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy, Migration {
+public abstract class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy {
 
     private static final Logger log = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class );
 
-    private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
 
-    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-    private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
 
-    private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
 
 
     private static final CollectionScopedRowKeySerializer<Id> ROW_KEY_SER =
@@ -104,6 +91,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
     protected final Keyspace keyspace;
     protected final SerializationFig serializationFig;
     protected final EntityRepair repair;
+    private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
 
 
     @Inject
@@ -111,6 +99,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
         this.keyspace = keyspace;
         this.serializationFig = serializationFig;
         this.repair = new EntityRepairImpl( this, serializationFig );
+        this.entityJsonSerializer = getEntitySerializer();
     }
 
 
@@ -126,7 +115,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
             @Override
             public void doOp( final ColumnListMutation<UUID> colMutation ) {
                 try {
-                    colMutation.putColumn( colName, ENTITY_JSON_SER
+                    colMutation.putColumn( colName, entityJsonSerializer
                             .toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) );
                 }
                 catch ( Exception e ) {
@@ -207,7 +196,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
 
             final Column<UUID> column = columns.getColumnByIndex( 0 );
 
-            final MvccEntity parsedEntity = new MvccColumnParser( entityId ).parseColumn( column );
+            final MvccEntity parsedEntity = new MvccColumnParser( entityId, entityJsonSerializer ).parseColumn( column );
 
             //we *might* need to repair, it's not clear so check before loading into result sets
             final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
@@ -245,7 +234,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
                 keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey )
                         .withColumnRange( version, null, false, fetchSize );
 
-        return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false );
+        return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false );
     }
 
 
@@ -275,7 +264,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
                 keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey )
                         .withColumnRange( null, version, true, fetchSize );
 
-        return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false );
+        return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false );
     }
 
 
@@ -290,7 +279,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
         return doWrite( collectionScope, entityId, new RowOp() {
             @Override
             public void doOp( final ColumnListMutation<UUID> colMutation ) {
-                colMutation.putColumn( version, ENTITY_JSON_SER
+                colMutation.putColumn( version, entityJsonSerializer
                         .toByteBuffer( new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ) ) );
             }
         } );
@@ -367,12 +356,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
     /**
      * Simple bean wrapper for state and entity
      */
-    private static class EntityWrapper {
-        private final MvccEntity.Status status;
-        private final Optional<Entity> entity;
+    protected static class EntityWrapper {
+        protected final MvccEntity.Status status;
+        protected final Optional<Entity> entity;
 
 
-        private EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) {
+        protected EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) {
             this.status = status;
             this.entity = entity;
         }
@@ -385,10 +374,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
     private static final class MvccColumnParser implements ColumnParser<UUID, MvccEntity> {
 
         private final Id id;
+        private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
 
 
-        private MvccColumnParser( Id id ) {
+        private MvccColumnParser( Id id, final AbstractSerializer<EntityWrapper> entityJsonSerializer ) {
             this.id = id;
+            this.entityJsonSerializer = entityJsonSerializer;
         }
 
 
@@ -399,7 +390,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
             final UUID version = column.getName();
 
             try {
-                deSerialized = column.getValue( ENTITY_JSON_SER );
+                deSerialized = column.getValue( entityJsonSerializer );
             }
             catch ( DataCorruptionException e ) {
               log.error(
@@ -421,128 +412,9 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
     }
 
 
-    public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
-
-
-        public static final SmileFactory f = new SmileFactory();
-
-        public static ObjectMapper mapper;
-
-        private static byte[] STATE_COMPLETE = new byte[] { 0 };
-        private static byte[] STATE_DELETED = new byte[] { 1 };
-        private static byte[] STATE_PARTIAL = new byte[] { 2 };
-
-        private static byte[] VERSION = new byte[] { 0 };
-
-
-
-        public EntitySerializer() {
-            try {
-                mapper = new ObjectMapper( f );
-                //                mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
-                // causes slowness
-                mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
-            }
-            catch ( Exception e ) {
-                throw new RuntimeException( "Error setting up mapper", e );
-            }
-        }
-
-
-        @Override
-        public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
-            if ( wrapper == null ) {
-                return null;
-            }
-
-            CompositeBuilder builder = Composites.newCompositeBuilder();
-
-            builder.addBytes( VERSION );
-
-            //mark this version as empty
-            if ( !wrapper.entity.isPresent() ) {
-                //we're empty
-                builder.addBytes( STATE_DELETED );
-
-                return builder.build();
-            }
-
-            //we have an entity
-
-            if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
-                builder.addBytes( STATE_COMPLETE );
-            }
-
-            else {
-                builder.addBytes( STATE_PARTIAL );
-            }
-
-            try {
-                final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() ) ;
-                builder.addBytes( entityBytes );
-            }
-            catch ( Exception e ) {
-                throw new RuntimeException( "Unable to serialize entity", e );
-            }
-
-            return builder.build();
-        }
-
-
-        @Override
-        public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
-
-            /**
-             * We intentionally turn data corruption exceptions when we're unable to de-serialize
-             * the data in cassandra.  If this occurs, we'll never be able to de-serialize it
-             * and it should be considered lost.  This is an error that is occuring due to a bug
-             * in serializing the entity.  This is a lazy recognition + repair signal for deployment with
-             * existing systems.
-             */
-            CompositeParser parser;
-            try {
-                parser = Composites.newCompositeParser( byteBuffer );
-            }
-            catch ( Exception e ) {
-              throw new DataCorruptionException("Unable to de-serialze entity", e);
-            }
-
-            byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
-
-            if ( !Arrays.equals( VERSION, version ) ) {
-                throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
-            }
-
-            byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
-
-            // it's been deleted, remove it
-
-            if ( Arrays.equals( STATE_DELETED, state ) ) {
-                return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
-            }
-
-            Entity storedEntity;
-
-            ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
-            byte[] array = jsonBytes.array();
-            int start = jsonBytes.arrayOffset();
-            int length = jsonBytes.remaining();
-
-            try {
-                storedEntity = mapper.readValue( array, start, length, Entity.class );
-            }
-            catch ( Exception e ) {
-                throw new DataCorruptionException( "Unable to read entity data", e );
-            }
-
-            final Optional<Entity> entity = Optional.of( storedEntity );
-
-            if ( Arrays.equals( STATE_COMPLETE, state ) ) {
-                return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
-            }
-
-            // it's partial by default
-            return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
-        }
-    }
+    /**
+     * Return the entity serializer for this instance
+     * @return
+     */
+    protected abstract AbstractSerializer<EntityWrapper> getEntitySerializer();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
new file mode 100644
index 0000000..a9e01b1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Version 3 implementation of entity serialization. This will proxy writes and reads so that during
+ * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * it will be available from the new source
+ */
+@Singleton
+public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy {
+
+
+    public static final int MIGRATION_VERSION = 3;
+
+    private final DataMigrationManager dataMigrationManager;
+    private final Keyspace keyspace;
+    private final MvccEntitySerializationStrategy previous;
+    private final MvccEntitySerializationStrategy current;
+
+
+    @Inject
+    public MvccEntitySerializationStrategyProxyImpl( final DataMigrationManager dataMigrationManager,
+                                                     final Keyspace keyspace,
+                                                     @PreviousImpl final MvccEntitySerializationStrategy previous,
+                                                     @CurrentImpl final MvccEntitySerializationStrategy current ) {
+        this.dataMigrationManager = dataMigrationManager;
+        this.keyspace = keyspace;
+        this.previous = previous;
+        this.current = current;
+    }
+
+
+    @Override
+    public MutationBatch write( final CollectionScope context, final MvccEntity entity ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.write( context, entity ) );
+            aggregateBatch.mergeShallow( current.write( context, entity ) );
+
+            return aggregateBatch;
+        }
+
+        return current.write( context, entity );
+    }
+
+
+    @Override
+    public EntitySet load( final CollectionScope scope, final Collection<Id> entityIds, final UUID maxVersion ) {
+        if ( isOldVersion() ) {
+            return previous.load( scope, entityIds, maxVersion );
+        }
+
+        return current.load( scope, entityIds, maxVersion );
+    }
+
+
+    @Override
+    public Iterator<MvccEntity> load( final CollectionScope context, final Id entityId, final UUID version,
+                                      final int fetchSize ) {
+        if ( isOldVersion() ) {
+            return previous.load( context, entityId, version, fetchSize );
+        }
+
+        return current.load( context, entityId, version, fetchSize );
+    }
+
+
+    @Override
+    public Iterator<MvccEntity> loadHistory( final CollectionScope context, final Id entityId, final UUID version,
+                                             final int fetchSize ) {
+        if ( isOldVersion() ) {
+            return previous.loadHistory( context, entityId, version, fetchSize );
+        }
+
+        return current.loadHistory( context, entityId, version, fetchSize );
+    }
+
+
+    @Override
+    public MutationBatch mark( final CollectionScope context, final Id entityId, final UUID version ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.mark( context, entityId, version ) );
+            aggregateBatch.mergeShallow( current.mark( context, entityId, version ) );
+
+            return aggregateBatch;
+        }
+
+        return current.mark( context, entityId, version );
+    }
+
+
+    @Override
+    public MutationBatch delete( final CollectionScope context, final Id entityId, final UUID version ) {
+        if ( isOldVersion() ) {
+            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+            aggregateBatch.mergeShallow( previous.delete( context, entityId, version ) );
+            aggregateBatch.mergeShallow( current.delete( context, entityId, version ) );
+
+            return aggregateBatch;
+        }
+
+        return current.delete( context, entityId, version );
+    }
+
+
+    /**
+     * Return true if we're on an old version
+     */
+    private boolean isOldVersion() {
+        return dataMigrationManager.getCurrentVersion() < MIGRATION_VERSION;
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.emptyList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
new file mode 100644
index 0000000..49b3486
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.netflix.astyanax.serializers.BytesArraySerializer;
+
+
+/**
+ * Version 1 implementation of entity serialization
+ */
+@Singleton
+public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializationStrategyImpl {
+
+    private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
+
+
+    @Inject
+    public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+        super( keyspace, serializationFig );
+    }
+
+
+    @Override
+    protected AbstractSerializer<MvccEntitySerializationStrategyImpl.EntityWrapper> getEntitySerializer() {
+        return ENTITY_JSON_SER;
+    }
+
+
+    public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
+
+
+        private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
+
+        private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+
+
+        public static final SmileFactory f = new SmileFactory();
+
+        public static ObjectMapper mapper;
+
+        private static byte[] STATE_COMPLETE = new byte[] { 0 };
+        private static byte[] STATE_DELETED = new byte[] { 1 };
+        private static byte[] STATE_PARTIAL = new byte[] { 2 };
+
+        private static byte[] VERSION = new byte[] { 0 };
+
+
+        public EntitySerializer() {
+            try {
+                mapper = new ObjectMapper( f );
+                //                mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
+                // causes slowness
+                mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( "Error setting up mapper", e );
+            }
+        }
+
+
+        @Override
+        public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
+            if ( wrapper == null ) {
+                return null;
+            }
+
+            CompositeBuilder builder = Composites.newCompositeBuilder();
+
+            builder.addBytes( VERSION );
+
+            //mark this version as empty
+            if ( !wrapper.entity.isPresent() ) {
+                //we're empty
+                builder.addBytes( STATE_DELETED );
+
+                return builder.build();
+            }
+
+            //we have an entity
+
+            if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
+                builder.addBytes( STATE_COMPLETE );
+            }
+
+            else {
+                builder.addBytes( STATE_PARTIAL );
+            }
+
+            try {
+                final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() );
+                builder.addBytes( entityBytes );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( "Unable to serialize entity", e );
+            }
+
+            return builder.build();
+        }
+
+
+        @Override
+        public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
+
+            /**
+             * We intentionally turn data corruption exceptions when we're unable to de-serialize
+             * the data in cassandra.  If this occurs, we'll never be able to de-serialize it
+             * and it should be considered lost.  This is an error that is occuring due to a bug
+             * in serializing the entity.  This is a lazy recognition + repair signal for deployment with
+             * existing systems.
+             */
+            CompositeParser parser;
+            try {
+                parser = Composites.newCompositeParser( byteBuffer );
+            }
+            catch ( Exception e ) {
+                throw new DataCorruptionException( "Unable to de-serialze entity", e );
+            }
+
+            byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
+
+            if ( !Arrays.equals( VERSION, version ) ) {
+                throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
+            }
+
+            byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
+
+            // it's been deleted, remove it
+
+            if ( Arrays.equals( STATE_DELETED, state ) ) {
+                return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+            }
+
+            Entity storedEntity;
+
+            ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
+            byte[] array = jsonBytes.array();
+            int start = jsonBytes.arrayOffset();
+            int length = jsonBytes.remaining();
+
+            try {
+                storedEntity = mapper.readValue( array, start, length, Entity.class );
+            }
+            catch ( Exception e ) {
+                throw new DataCorruptionException( "Unable to read entity data", e );
+            }
+
+            final Optional<Entity> entity = Optional.of( storedEntity );
+
+            if ( Arrays.equals( STATE_COMPLETE, state ) ) {
+                return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
+            }
+
+            // it's partial by default
+            return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
new file mode 100644
index 0000000..f80960d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.netflix.astyanax.serializers.BytesArraySerializer;
+
+
+/**
+ * Version 1 implementation of entity serialization
+ */
+@Singleton
+public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializationStrategyImpl {
+
+    private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
+
+
+    @Inject
+    public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+        super( keyspace, serializationFig );
+    }
+
+
+    @Override
+    protected AbstractSerializer<EntityWrapper> getEntitySerializer() {
+        return ENTITY_JSON_SER;
+    }
+
+
+    public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
+
+
+        private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
+
+        private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+
+
+        public static final SmileFactory f = new SmileFactory();
+
+        public static ObjectMapper mapper;
+
+        private static byte[] STATE_COMPLETE = new byte[] { 0 };
+        private static byte[] STATE_DELETED = new byte[] { 1 };
+        private static byte[] STATE_PARTIAL = new byte[] { 2 };
+
+        private static byte[] VERSION = new byte[] { 0 };
+
+
+        public EntitySerializer() {
+            try {
+                mapper = new ObjectMapper( f );
+                //                mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
+                // causes slowness
+                mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( "Error setting up mapper", e );
+            }
+        }
+
+
+        @Override
+        public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
+            if ( wrapper == null ) {
+                return null;
+            }
+
+            CompositeBuilder builder = Composites.newCompositeBuilder();
+
+            builder.addBytes( VERSION );
+
+            //mark this version as empty
+            if ( !wrapper.entity.isPresent() ) {
+                //we're empty
+                builder.addBytes( STATE_DELETED );
+
+                return builder.build();
+            }
+
+            //we have an entity
+
+            if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
+                builder.addBytes( STATE_COMPLETE );
+            }
+
+            else {
+                builder.addBytes( STATE_PARTIAL );
+            }
+
+            try {
+                final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() );
+                builder.addBytes( entityBytes );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( "Unable to serialize entity", e );
+            }
+
+            return builder.build();
+        }
+
+
+        @Override
+        public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
+
+            /**
+             * We intentionally turn data corruption exceptions when we're unable to de-serialize
+             * the data in cassandra.  If this occurs, we'll never be able to de-serialize it
+             * and it should be considered lost.  This is an error that is occuring due to a bug
+             * in serializing the entity.  This is a lazy recognition + repair signal for deployment with
+             * existing systems.
+             */
+            CompositeParser parser;
+            try {
+                parser = Composites.newCompositeParser( byteBuffer );
+            }
+            catch ( Exception e ) {
+                throw new DataCorruptionException( "Unable to de-serialze entity", e );
+            }
+
+            byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
+
+            if ( !Arrays.equals( VERSION, version ) ) {
+                throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
+            }
+
+            byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
+
+            // it's been deleted, remove it
+
+            if ( Arrays.equals( STATE_DELETED, state ) ) {
+                return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+            }
+
+            Entity storedEntity;
+
+            ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
+            byte[] array = jsonBytes.array();
+            int start = jsonBytes.arrayOffset();
+            int length = jsonBytes.remaining();
+
+            try {
+                storedEntity = mapper.readValue( array, start, length, Entity.class );
+            }
+            catch ( Exception e ) {
+                throw new DataCorruptionException( "Unable to read entity data", e );
+            }
+
+            final Optional<Entity> entity = Optional.of( storedEntity );
+
+            if ( Arrays.equals( STATE_COMPLETE, state ) ) {
+                return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
+            }
+
+            // it's partial by default
+            return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 7368e9a..831091d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -71,7 +71,7 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
  * @author tnine
  */
 @Singleton
-public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy, Migration {
+public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy {
 
     private static final Logger LOG = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index 8a4a9ac..eb9c374 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -21,9 +21,13 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Key;
 import com.google.inject.multibindings.Multibinder;
 
 
@@ -37,14 +41,23 @@ public class SerializationModule extends AbstractModule {
 
 
         // bind the serialization strategies
-        bind( MvccEntitySerializationStrategy.class ).to( MvccEntitySerializationStrategyImpl.class );
+
+        //We've migrated this one, so we need to set up the previous, current, and proxy
+        bind( MvccEntitySerializationStrategy.class ).annotatedWith( PreviousImpl.class )
+                                                     .to( MvccEntitySerializationStrategyV1Impl.class );
+        bind( MvccEntitySerializationStrategy.class ).annotatedWith( CurrentImpl.class )
+                                                     .to( MvccEntitySerializationStrategyV2Impl.class );
+        bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+                                                     .to( MvccEntitySerializationStrategyProxyImpl.class );
+
         bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
 
         //do multibindings for migrations
         Multibinder<Migration> uriBinder = Multibinder.newSetBinder( binder(), Migration.class );
-        uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class );
-        uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class );
-        uriBinder.addBinding().to( UniqueValueSerializationStrategyImpl.class );
+        uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, PreviousImpl.class ) );
+        uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, CurrentImpl.class ) );
+        uriBinder.addBinding().to( Key.get( MvccLogEntrySerializationStrategy.class ) );
+        uriBinder.addBinding().to( Key.get( UniqueValueSerializationStrategy.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index b6b3cab..be95b08 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -57,7 +57,7 @@ import com.netflix.astyanax.util.RangeBuilder;
 /**
  * Reads and writes to UniqueValues column family.
  */
-public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy, Migration {
+public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy {
 
     private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
index f2deb3e..0d44e98 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
@@ -6,7 +6,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.test.ITRunner;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 66f162f..2d18675 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -23,18 +23,19 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.test.UseModules;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -42,10 +43,10 @@ import org.apache.usergrid.persistence.model.field.BooleanField;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.IntegerField;
 import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.util.Health;
 
 import rx.Observable;
 
@@ -58,8 +59,8 @@ import static org.junit.Assert.fail;
 
 
 /** @author tnine */
-@RunWith(ITRunner.class)
-@UseModules(TestCollectionModule.class)
+@RunWith( ITRunner.class )
+@UseModules( TestCollectionModule.class )
 public class EntityCollectionManagerIT {
     @Inject
     private EntityCollectionManagerFactory factory;
@@ -533,7 +534,7 @@ public class EntityCollectionManagerIT {
     }
 
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test( expected = IllegalArgumentException.class )
     public void readTooLarge() {
 
         final CollectionScope context =
@@ -630,8 +631,7 @@ public class EntityCollectionManagerIT {
         final UUID v2Version = v2Created.getVersion();
 
 
-        assertTrue( "Newer version in v2",
-                UUIDComparator.staticCompare( v2Version, v1Version) > 0 );
+        assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
 
 
         final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -678,12 +678,9 @@ public class EntityCollectionManagerIT {
         final UUID v2Version = v2Created.getVersion();
 
 
-
-
         assertEquals( "Same entityId", v1Created.getId(), v2Created.getId() );
 
-        assertTrue( "Newer version in v2",
-                UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
+        assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
 
 
         final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -700,11 +697,60 @@ public class EntityCollectionManagerIT {
     @Test
     public void healthTest() {
 
-        CollectionScope context = new CollectionScopeImpl( 
-            new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+        CollectionScope context =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         assertEquals( Health.GREEN, manager.getHealth() );
     }
+
+
+    /**
+     * Tests an entity with more than  65535 bytes worth of data
+     */
+    @Test
+    public void largeEntityWriteRead() {
+        final int setSize = 65535 * 2;
+
+        int currentLength = 0;
+
+        final Entity entity = new Entity( new SimpleId( "test" ) );
+
+        //generate a really large string value
+        StringBuilder builder = new StringBuilder();
+
+        for ( int i = 0; i < 100; i++ ) {
+            builder.append( UUIDGenerator.newTimeUUID().toString() );
+        }
+
+        final String value = builder.toString();
+
+
+        //loop until our size is beyond the set size
+        for ( int i = 0; currentLength < setSize; i++ ) {
+            final String key = "newStringField" + i;
+
+            entity.setField( new StringField( key, value ) );
+
+            currentLength += key.length() + value.length();
+        }
+
+
+        //now we have one massive, entity, save it and retrieve it.
+        CollectionScope context =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+        final EntityCollectionManager manager = factory.createCollectionManager( context );
+
+        final Entity saved = manager.write( entity ).toBlocking().last();
+
+
+        assertEquals( entity, saved );
+
+        //now load it
+        final Entity loaded = manager.load( entity.getId() ).toBlocking().last();
+
+        assertEquals( entity, loaded );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
index d4f4d81..11ad389 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.time.StopWatch;
 
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.test.ITRunner;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
index ec35ed2..d4881de 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
@@ -6,7 +6,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.test.ITRunner;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
deleted file mode 100644
index 6b02b63..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- */
-@Singleton
-public class MigrationManagerRule extends ExternalResource {
-    private static final Logger LOG = LoggerFactory.getLogger( MigrationManagerRule.class );
-
-    private MigrationManager migrationManager;
-
-
-    @Inject
-    public void setMigrationManager( MigrationManager migrationManager )  {
-        this.migrationManager = migrationManager;
-    }
-
-
-    @Override
-    protected void before() throws MigrationException {
-        LOG.info( "Starting migration" );
-
-        migrationManager.migrate();
-
-        LOG.info( "Migration complete" );
-    }
-}