You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/05 01:55:32 UTC

[6/6] git commit: Commit after refactor. Need to evaluate an event bus in guava instead of using coupled stages

Commit after refactor.  Need to evaluate an event bus in guava instead of using coupled stages


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/44072d59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/44072d59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/44072d59

Branch: refs/heads/two-dot-o
Commit: 44072d594f76f9f4fd0e6bc828e6b4d5ba7b8d81
Parents: 43aba65
Author: Todd Nine <to...@apache.org>
Authored: Wed Dec 4 17:42:24 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Wed Dec 4 17:52:04 2013 -0700

----------------------------------------------------------------------
 .../collection/impl/CollectionManagerImpl.java  |  14 +-
 .../collection/mvcc/stage/ExecutionStage.java   |  15 ++
 .../collection/mvcc/stage/Stage.java            |  15 --
 .../collection/mvcc/stage/StagePipeline.java    |   8 +-
 .../collection/mvcc/stage/impl/Clear.java       |  91 ---------
 .../stage/impl/CollectionPipelineModule.java    |  33 +++-
 .../collection/mvcc/stage/impl/Commit.java      |  90 ---------
 .../collection/mvcc/stage/impl/Create.java      |  85 ---------
 .../mvcc/stage/impl/CreatePipeline.java         |  23 ---
 .../mvcc/stage/impl/DeletePipeline.java         |  23 ---
 .../mvcc/stage/impl/ExecutionContextImpl.java   |   6 +-
 .../collection/mvcc/stage/impl/Load.java        |  82 --------
 .../mvcc/stage/impl/LoadPipeline.java           |  23 ---
 .../mvcc/stage/impl/StagePipelineImpl.java      |  32 ++--
 .../collection/mvcc/stage/impl/StartDelete.java |  97 ----------
 .../collection/mvcc/stage/impl/StartWrite.java  |  92 ---------
 .../collection/mvcc/stage/impl/Update.java      |  66 -------
 .../mvcc/stage/impl/UpdatePipeline.java         |  23 ---
 .../collection/mvcc/stage/impl/Verify.java      |  25 ---
 .../mvcc/stage/impl/WriteContextCallback.java   |  59 ------
 .../mvcc/stage/impl/delete/Delete.java          |  91 +++++++++
 .../mvcc/stage/impl/delete/DeletePipeline.java  |  23 +++
 .../mvcc/stage/impl/delete/StartDelete.java     |  96 ++++++++++
 .../collection/mvcc/stage/impl/read/Load.java   |  82 ++++++++
 .../mvcc/stage/impl/read/PipelineLoad.java      |  23 +++
 .../mvcc/stage/impl/write/Commit.java           |  90 +++++++++
 .../mvcc/stage/impl/write/Create.java           |  85 +++++++++
 .../mvcc/stage/impl/write/PipelineCreate.java   |  23 +++
 .../mvcc/stage/impl/write/PipelineUpdate.java   |  23 +++
 .../mvcc/stage/impl/write/StartWrite.java       |  92 +++++++++
 .../mvcc/stage/impl/write/Update.java           |  70 +++++++
 .../mvcc/stage/impl/write/Verify.java           |  25 +++
 .../stage/impl/write/WriteContextCallback.java  |  59 ++++++
 .../MvccEntitySerializationStrategy.java        |   2 +-
 .../collection/CollectionManagerTest.java       |  13 +-
 .../mvcc/stage/ExecutionContextTest.java        |  24 +--
 .../mvcc/stage/StagePipelineTest.java           |  22 +--
 .../collection/mvcc/stage/impl/CreateTest.java  | 152 ---------------
 .../mvcc/stage/impl/StartWriteTest.java         | 187 ------------------
 .../mvcc/stage/impl/write/CreateTest.java       | 153 +++++++++++++++
 .../mvcc/stage/impl/write/StartWriteTest.java   | 188 ++++++++++++++++++
 .../mvcc/stage/impl/write/UpdateTest.java       | 189 +++++++++++++++++++
 .../persistence/index/stage/Complete.java       |   4 +-
 .../usergrid/persistence/index/stage/Start.java |   4 +-
 .../usergrid/persistence/index/stage/Write.java |   4 +-
 45 files changed, 1420 insertions(+), 1206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index c20d2c3..e20ba62 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -11,11 +11,11 @@ import org.apache.usergrid.persistence.collection.CollectionManager;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.LoadPipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.inject.Inject;
@@ -39,10 +39,10 @@ public class CollectionManagerImpl implements CollectionManager {
 
 
     @Inject
-    public CollectionManagerImpl( @CreatePipeline final StagePipeline createPipeline,
-                                  @UpdatePipeline final StagePipeline updatePipeline,
+    public CollectionManagerImpl( @PipelineCreate final StagePipeline createPipeline,
+                                  @PipelineUpdate final StagePipeline updatePipeline,
                                   @DeletePipeline final StagePipeline deletePipeline,
-                                  @LoadPipeline final StagePipeline loadPipeline,
+                                  @PipelineLoad final StagePipeline loadPipeline,
                                   @Assisted final CollectionContext context ) {
 
         this.createPipeline = createPipeline;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
new file mode 100644
index 0000000..a98c813
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+/** The possible stages in our write flow. */
+public interface ExecutionStage {
+
+    /**
+     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
+     *
+     * @param context The context of the current write operation
+     *
+     * @return The asynchronous listener to signal success
+     */
+    public void performStage( ExecutionContext context );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
deleted file mode 100644
index 538a546..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-/** The possible stages in our write flow. */
-public interface Stage {
-
-    /**
-     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
-     *
-     * @param context The context of the current write operation
-     *
-     * @return The asynchronous listener to signal success
-     */
-    public void performStage( ExecutionContext context );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 0b032d9..9d68e10 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -13,14 +13,14 @@ public interface StagePipeline {
     /**
      * Get the first stage in this pipeline.
      */
-    Stage first();
+    ExecutionStage first();
 
 
     /**
-     * get the next stage after the stage specified
-     * @param stage The stage to seek in our pipeline
+     * get the next executionStage after the executionStage specified
+     * @param executionStage The executionStage to seek in our pipeline
      */
-    Stage nextStage(Stage stage);
+    ExecutionStage nextStage(ExecutionStage executionStage );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
deleted file mode 100644
index 854ec24..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Clear implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Clear.class );
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Clear( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                  final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-
-        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-              Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
-
-        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
-
-        //insert a "cleared" value into the versions.  Post processing should actually delete
-        MutationBatch entityMutation = entitySerializationStrategy.clear( collectionContext, entityId, version );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-
-        try {
-            logMutation.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-        /**
-         * We're done executing.
-         */
-        executionContext.proceed();
-
-        //TODO connect to post processors via listener
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 8390e7b..68dab72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,8 +1,20 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.Delete;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.StartDelete;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Commit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Update;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Verify;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
@@ -24,7 +36,7 @@ public class CollectionPipelineModule extends AbstractModule {
      * objects are mutable
      */
     @Provides
-    @CreatePipeline
+    @PipelineCreate
     @Inject
     @Singleton
     public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
@@ -34,7 +46,7 @@ public class CollectionPipelineModule extends AbstractModule {
 
 
     @Provides
-    @UpdatePipeline
+    @PipelineUpdate
     @Inject
     @Singleton
     public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
@@ -47,13 +59,13 @@ public class CollectionPipelineModule extends AbstractModule {
     @DeletePipeline
     @Inject
     @Singleton
-    public StagePipeline deletePipeline(final StartDelete startDelete,  final Clear delete ) {
+    public StagePipeline deletePipeline(final StartDelete startDelete,  final Delete delete ) {
         return StagePipelineImpl.fromStages(startDelete, delete );
     }
 
 
     @Provides
-    @LoadPipeline
+    @PipelineLoad
     @Inject
     @Singleton
     public StagePipeline deletePipeline( final Load load ) {
@@ -67,17 +79,20 @@ public class CollectionPipelineModule extends AbstractModule {
         /**
          * Configure all stages here
          */
-        Multibinder<Stage> stageBinder = Multibinder.newSetBinder( binder(), Stage.class );
+        Multibinder<ExecutionStage> stageBinder = Multibinder.newSetBinder( binder(), ExecutionStage.class );
 
 
 
         //creation stages
+        stageBinder.addBinding().to( Commit.class );
         stageBinder.addBinding().to( Create.class );
-        stageBinder.addBinding().to( Update.class );
         stageBinder.addBinding().to( StartWrite.class );
+        stageBinder.addBinding().to( Update.class );
         stageBinder.addBinding().to( Verify.class );
-        stageBinder.addBinding().to( Commit.class );
-        stageBinder.addBinding().to( Clear.class );
+
+        //delete stages
+        stageBinder.addBinding().to( Delete.class );
+        stageBinder.addBinding().to( StartDelete.class );
 
         //loading stages
         stageBinder.addBinding().to(Load.class);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
deleted file mode 100644
index f29bf93..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-                      Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
-
-        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
-
-        //now get our actual insert into the entity data
-        MutationBatch entityMutation = entitySerializationStrategy.write( collectionContext, entity );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-
-        try {
-            logMutation.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-        /**
-         * We're done executing.
-         */
-        executionContext.proceed();
-
-        //TODO connect to post processors via listener
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
deleted file mode 100644
index 2026274..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.service.TimeService;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.collection.util.Verify;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- * This is the first stage and should be invoked immediately when a new entity create is started. No UUIDs should be
- * present, and this should set the entityId, version, created, and updated dates
- */
-@Singleton
-public class Create implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( Create.class );
-
-
-    private final TimeService timeService;
-    private final UUIDService uuidService;
-
-
-    @Inject
-    public Create( final TimeService timeService, final UUIDService uuidService ) {
-        Preconditions.checkNotNull( timeService, "timeService is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.timeService = timeService;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        Verify.isNull( entity.getUuid(), "A new entity should not have an id set.  This is an update operation" );
-
-
-        final UUID entityId = uuidService.newTimeUUID();
-        final UUID version = entityId;
-        final long created = timeService.getTime();
-
-
-        try {
-            FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
-        }
-        catch ( Throwable t ) {
-            LOG.error( "Unable to set uuid.  See nested exception", t );
-            throw new CollectionRuntimeException( "Unable to set uuid.  See nested exception", t );
-        }
-
-        entity.setVersion( version );
-        entity.setCreated( created );
-        entity.setUpdated( created );
-
-        //set the updated entity for the next stage
-        executionContext.setMessage( entity );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
deleted file mode 100644
index efe50c8..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface CreatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
deleted file mode 100644
index 3d95ddb..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
index 8139f3d..805d1e3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
@@ -3,7 +3,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
 
 import com.google.common.base.Preconditions;
@@ -17,7 +17,7 @@ public class ExecutionContextImpl implements ExecutionContext {
     private final CollectionContext context;
 
     private Object message;
-    private Stage current;
+    private ExecutionStage current;
 
 
     @Inject
@@ -72,7 +72,7 @@ public class ExecutionContextImpl implements ExecutionContext {
 
     @Override
     public void proceed() {
-        Stage next = this.pipeline.nextStage( current );
+        ExecutionStage next = this.pipeline.nextStage( current );
 
         //Nothing to do
         if ( next == null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
deleted file mode 100644
index b0ac251..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/**
- * This stage is a load stage to load a single entity
- */
-public class Load implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Load.class );
-
-    private final UUIDService uuidService;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.uuidService = uuidService;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final UUID entityId = executionContext.getMessage( UUID.class );
-
-        Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-        //generate  a version that represents now
-        final UUID versionMax = uuidService.newTimeUUID();
-
-        List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
-
-        //nothing to do, we didn't get a result back
-        if(results.size() != 1){
-            executionContext.setMessage( null );
-            executionContext.proceed();
-            return;
-        }
-
-        final Optional<Entity> targetVersion = results.get(0).getEntity();
-
-        //this entity has been marked as cleared.  The version exists, but does not have entity data
-        if(!targetVersion.isPresent()){
-
-            //TODO, a lazy async repair/cleanup here?
-
-            executionContext.setMessage( null );
-            executionContext.proceed();
-            return;
-        }
-
-        executionContext.setMessage( targetVersion.get() );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
deleted file mode 100644
index f712b2c..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface LoadPipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
index 74dc003..1f3a0fe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -4,7 +4,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
 
 import com.google.common.base.Preconditions;
@@ -13,25 +13,25 @@ import com.google.common.base.Preconditions;
 /** @author tnine */
 public class StagePipelineImpl implements StagePipeline {
 
-    private final List<Stage> stages;
+    private final List<ExecutionStage> executionStages;
 
 
-    protected StagePipelineImpl( List<Stage> stages ) {
-        Preconditions.checkNotNull(stages, "stages is required");
-        Preconditions.checkArgument(  stages.size() > 0, "stages must have more than 1 element" );
+    protected StagePipelineImpl( List<ExecutionStage> executionStages ) {
+        Preconditions.checkNotNull( executionStages, "executionStages is required");
+        Preconditions.checkArgument(  executionStages.size() > 0, "executionStages must have more than 1 element" );
 
-        this.stages = stages;
+        this.executionStages = executionStages;
     }
 
 
     @Override
-    public Stage first() {
+    public ExecutionStage first() {
 
-        if ( stages.size() == 0 ) {
+        if ( executionStages.size() == 0 ) {
             return null;
         }
 
-        return stages.get( 0 );
+        return executionStages.get( 0 );
     }
 
 
@@ -39,23 +39,23 @@ public class StagePipelineImpl implements StagePipeline {
 
 
     @Override
-    public Stage nextStage( final Stage stage ) {
+    public ExecutionStage nextStage( final ExecutionStage executionStage ) {
 
-        Preconditions.checkNotNull( stage, "Stage cannot be null" );
+        Preconditions.checkNotNull( executionStage, "ExecutionStage cannot be null" );
 
-        int index = stages.indexOf( stage );
+        int index = executionStages.indexOf( executionStage );
 
         //we're done, do nothing
-        if ( index == stages.size() -1  ) {
+        if ( index == executionStages.size() -1  ) {
             return null;
         }
 
-        return  stages.get( index + 1 );
+        return  executionStages.get( index + 1 );
     }
 
 
     /** Factory to create a new instance. */
-    public static StagePipelineImpl fromStages( Stage... stages ) {
-        return new StagePipelineImpl(Arrays.asList(  stages ));
+    public static StagePipelineImpl fromStages( ExecutionStage... executionStages ) {
+        return new StagePipelineImpl(Arrays.asList( executionStages ));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
deleted file mode 100644
index 383ee9e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class StartDelete implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
-
-    private final MvccLogEntrySerializationStrategy logStrategy;
-    private final UUIDService uuidService;
-
-
-    /** Create a new stage with the current context */
-    @Inject
-    public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
-
-        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.logStrategy = logStrategy;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final UUID entityId = executionContext.getMessage( UUID.class );
-
-
-        final UUID version = uuidService.newTimeUUID();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
-
-        MutationBatch write = logStrategy.write( collectionContext, startEntry );
-
-
-        try {
-            write.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-
-        //create the mvcc entity for the next stage
-        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
-
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
deleted file mode 100644
index 4c55ae6..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class StartWrite implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
-
-    private final MvccLogEntrySerializationStrategy logStrategy;
-
-
-    /** Create a new stage with the current context */
-    @Inject
-    public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
-        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-
-
-        this.logStrategy = logStrategy;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
-
-        MutationBatch write = logStrategy.write( collectionContext, startEntry );
-
-
-        try {
-            write.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-
-        //create the mvcc entity for the next stage
-        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
-
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
deleted file mode 100644
index ce1be76..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.service.TimeService;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- * This stage performs the initial commit log and write of an entity.  It assumes the entity id and created has already
- * been set correctly
- */
-@Singleton
-public class Update implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( Update.class );
-
-    private final TimeService timeService;
-    private final UUIDService uuidService;
-
-
-    @Inject
-    public Update( final TimeService timeService, final UUIDService uuidService ) {
-        Preconditions.checkNotNull( timeService, "timeService is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-        this.timeService = timeService;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-
-        final UUID version = uuidService.newTimeUUID();
-        final long updated = timeService.getTime();
-
-
-        entity.setVersion( version );
-        entity.setUpdated( updated );
-
-        executionContext.setMessage( entity );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
deleted file mode 100644
index abc6e15..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface UpdatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
deleted file mode 100644
index cc53000..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-
-import com.google.inject.Singleton;
-
-
-/** This phase should execute any verification on the MvccEntity */
-@Singleton
-public class Verify implements Stage {
-
-
-    public Verify() {
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        //TODO no op for now, just continue to the next stage.  Verification logic goes in here
-
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
deleted file mode 100644
index 2a34e95..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * Helper class to cause the async execution to continue
- * Not used ATM, just here for demonstration purposes with async astynax invocation on phase proceed
- *
- * @author tnine
- */
-public class WriteContextCallback implements FutureCallback<OperationResult<Void>> {
-
-    private final ExecutionContext context;
-
-
-    /** Create a new callback.  The data will be passed to the next stage */
-    private WriteContextCallback( final ExecutionContext context ) {
-        this.context = context;
-    }
-
-
-    public void onSuccess( final OperationResult<Void> result ) {
-
-        /**
-         * Proceed to the next stage
-         */
-        context.proceed();
-    }
-
-
-    @Override
-    public void onFailure( final Throwable t ) {
-//        context.stop();
-        throw new CollectionRuntimeException( "Failed to execute write", t );
-    }
-
-
-    /**
-     * This encapsulated type of Void in the listenable future is intentional.  If you're not returning void in your
-     * future, you shouldn't be using this callback, you should be using a callback that will set the Response value
-     * into the next stage and invoke it
-     *
-     * @param future The listenable future returned by the Astyanax async op
-     * @param context The context to signal to continue in the callback
-     */
-    public static void createCallback( final ListenableFuture<OperationResult<Void>> future,
-                                       final ExecutionContext context ) {
-
-        Futures.addCallback( future, new WriteContextCallback( context ) );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
new file mode 100644
index 0000000..7810beb
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
@@ -0,0 +1,91 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Delete implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Delete.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Delete( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+
+        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+              Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.COMMITTED );
+
+        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+        //insert a "cleared" value into the versions.  Post processing should actually delete
+        MutationBatch entityMutation = entitySerializationStrategy.clear( collectionContext, entityId, version );
+
+        //merge the 2 into 1 mutation
+        logMutation.mergeShallow( entityMutation );
+
+
+        try {
+            logMutation.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+        /**
+         * We're done executing.
+         */
+        executionContext.proceed();
+
+        //TODO connect to post processors via listener
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
new file mode 100644
index 0000000..52fe4b9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
new file mode 100644
index 0000000..4208662
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
@@ -0,0 +1,96 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartDelete implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
+
+    private final MvccLogEntrySerializationStrategy logStrategy;
+    private final UUIDService uuidService;
+
+
+    /** Create a new stage with the current context */
+    @Inject
+    public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
+
+        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.logStrategy = logStrategy;
+        this.uuidService = uuidService;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final UUID entityId = executionContext.getMessage( UUID.class );
+
+
+        final UUID version = uuidService.newTimeUUID();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.ACTIVE );
+
+        MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+        try {
+            write.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+
+        //create the mvcc entity for the next stage
+        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
+
+        executionContext.setMessage( nextStage );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
new file mode 100644
index 0000000..00a2d43
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
@@ -0,0 +1,82 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+
+
+/**
+ * This stage is a load stage to load a single entity
+ */
+public class Load implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Load.class );
+
+    private final UUIDService uuidService;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.uuidService = uuidService;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final UUID entityId = executionContext.getMessage( UUID.class );
+
+        Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+        //generate  a version that represents now
+        final UUID versionMax = uuidService.newTimeUUID();
+
+        List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
+
+        //nothing to do, we didn't get a result back
+        if(results.size() != 1){
+            executionContext.setMessage( null );
+            executionContext.proceed();
+            return;
+        }
+
+        final Optional<Entity> targetVersion = results.get(0).getEntity();
+
+        //this entity has been marked as cleared.  The version exists, but does not have entity data
+        if(!targetVersion.isPresent()){
+
+            //TODO, a lazy async repair/cleanup here?
+
+            executionContext.setMessage( null );
+            executionContext.proceed();
+            return;
+        }
+
+        executionContext.setMessage( targetVersion.get() );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
new file mode 100644
index 0000000..0d24b27
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface PipelineLoad {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
new file mode 100644
index 0000000..4780ff1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
@@ -0,0 +1,90 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Commit implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+                      Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.COMMITTED );
+
+        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+        //now get our actual insert into the entity data
+        MutationBatch entityMutation = entitySerializationStrategy.write( collectionContext, entity );
+
+        //merge the 2 into 1 mutation
+        logMutation.mergeShallow( entityMutation );
+
+
+        try {
+            logMutation.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+        /**
+         * We're done executing.
+         */
+        executionContext.proceed();
+
+        //TODO connect to post processors via listener
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
new file mode 100644
index 0000000..cbd7d9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
@@ -0,0 +1,85 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.service.TimeService;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.collection.util.Verify;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a new entity create is started. No UUIDs should be
+ * present, and this should set the entityId, version, created, and updated dates
+ */
+@Singleton
+public class Create implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( Create.class );
+
+
+    private final TimeService timeService;
+    private final UUIDService uuidService;
+
+
+    @Inject
+    public Create( final TimeService timeService, final UUIDService uuidService ) {
+        Preconditions.checkNotNull( timeService, "timeService is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.timeService = timeService;
+        this.uuidService = uuidService;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final Entity entity = executionContext.getMessage( Entity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        Verify.isNull( entity.getUuid(), "A new entity should not have an id set.  This is an update operation" );
+
+
+        final UUID entityId = uuidService.newTimeUUID();
+        final UUID version = entityId;
+        final long created = timeService.getTime();
+
+
+        try {
+            FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
+        }
+        catch ( Throwable t ) {
+            LOG.error( "Unable to set uuid.  See nested exception", t );
+            throw new CollectionRuntimeException( "Unable to set uuid.  See nested exception", t );
+        }
+
+        entity.setVersion( version );
+        entity.setCreated( created );
+        entity.setUpdated( created );
+
+        //set the updated entity for the next stage
+        executionContext.setMessage( entity );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
new file mode 100644
index 0000000..f3af972
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the create pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface PipelineCreate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
new file mode 100644
index 0000000..85bc56d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the create pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface PipelineUpdate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
new file mode 100644
index 0000000..d6e7d49
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
@@ -0,0 +1,92 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartWrite implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
+
+    private final MvccLogEntrySerializationStrategy logStrategy;
+
+
+    /** Create a new stage with the current context */
+    @Inject
+    public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+
+
+        this.logStrategy = logStrategy;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final Entity entity = executionContext.getMessage( Entity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.ACTIVE );
+
+        MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+        try {
+            write.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+
+        //create the mvcc entity for the next stage
+        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
+
+        executionContext.setMessage( nextStage );
+        executionContext.proceed();
+    }
+}