You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/03 04:23:23 UTC

git commit: WIP. Need to finish creating pipelines etc before merging.

Updated Branches:
  refs/heads/two-dot-o-async-hacks [created] b2833c39e


WIP.  Need to finish creating pipelines etc before merging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2833c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2833c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2833c39

Branch: refs/heads/two-dot-o-async-hacks
Commit: b2833c39ea46a2d4037671bda019d07ac69d9449
Parents: 04b9497
Author: Todd Nine <to...@apache.org>
Authored: Mon Dec 2 20:22:55 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Mon Dec 2 20:22:55 2013 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  2 +
 .../collection/CollectionManagerImpl.java       | 25 ++------
 .../collection/guice/CollectionModule.java      | 34 ++++++++--
 .../collection/mvcc/stage/Commit.java           | 22 -------
 .../collection/mvcc/stage/StagePipeline.java    | 35 +++++++++++
 .../collection/mvcc/stage/Start.java            | 44 -------------
 .../collection/mvcc/stage/Write.java            | 29 ---------
 .../collection/mvcc/stage/WriteContext.java     | 25 ++++++++
 .../collection/mvcc/stage/WriteListener.java    | 10 ---
 .../collection/mvcc/stage/WriteStage.java       |  7 +--
 .../stage/impl/CollectionPipelineModule.java    | 65 +++++++++++++++++++
 .../collection/mvcc/stage/impl/Commit.java      | 24 +++++++
 .../mvcc/stage/impl/CreatePipeline.java         | 20 ++++++
 .../mvcc/stage/impl/DeletePipeline.java         | 20 ++++++
 .../mvcc/stage/impl/StagePipelineImpl.java      | 59 +++++++++++++++++
 .../collection/mvcc/stage/impl/Start.java       | 66 ++++++++++++++++++++
 .../mvcc/stage/impl/UpdatePipeline.java         | 20 ++++++
 .../collection/mvcc/stage/impl/Write.java       | 27 ++++++++
 .../collection/mvcc/stage/StartTest.java        |  2 +-
 .../persistence/index/stage/Complete.java       |  9 +--
 .../usergrid/persistence/index/stage/Start.java | 10 +--
 .../usergrid/persistence/index/stage/Write.java | 10 +--
 22 files changed, 416 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 7e48ea4..1a0f358 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -91,6 +91,8 @@
       <version>${guice.version}</version>
     </dependency>
 
+
+
     <!-- Google Guice Integration Test Injectors -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
index 679a8cb..b5f0afb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
@@ -1,7 +1,6 @@
 package org.apache.usergrid.persistence.collection;
 
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.UUID;
 
 import org.slf4j.Logger;
@@ -10,15 +9,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.beanutils.BeanUtils;
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Commit;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Start;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Write;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-
 
 /**
  * Simple implementation.  Should perform
@@ -30,19 +23,12 @@ public class CollectionManagerImpl implements CollectionManager {
 
     private final CollectionContext context;
     private final TimeService timeService;
-    private final Start startStage;
-    private final Write writeStage;
-    private final Commit commitStage;
 
 
-    @Inject
-    public CollectionManagerImpl( final CollectionContext context, final TimeService timeService, final Start startStage, final Write writeStage,
-                                  final Commit commitStage ) {
+
+    public CollectionManagerImpl( final CollectionContext context, final TimeService timeService ) {
         this.context = context;
         this.timeService = timeService;
-        this.startStage = startStage;
-        this.writeStage = writeStage;
-        this.commitStage = commitStage;
     }
 
 
@@ -69,9 +55,7 @@ public class CollectionManagerImpl implements CollectionManager {
         MvccEntityImpl mvccEntity = new MvccEntityImpl(context, entityId, version, entity   );
 
 
-        MutationBatch mutation = startStage.performStage(  mvccEntity );
-        writeStage.performStage( mvccEntity );
-        commitStage.performStage( mvccEntity );
+
     }
 
 
@@ -91,4 +75,7 @@ public class CollectionManagerImpl implements CollectionManager {
     public Entity load( final UUID entityId ) {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index c83fe4f..e675e88 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -5,12 +5,17 @@ import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvide
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.migration.MigrationManager;
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategyImpl;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
 import com.google.inject.multibindings.Multibinder;
 import com.google.inject.name.Names;
 import com.netflix.astyanax.Keyspace;
@@ -23,9 +28,7 @@ import com.netflix.astyanax.Keyspace;
  */
 public class CollectionModule extends AbstractModule {
 
-    /**
-     * The location of the properties file
-     */
+    /** The location of the properties file */
     private static final String CASS_PROPS = "cassandra.properties";
 
 
@@ -44,7 +47,7 @@ public class CollectionModule extends AbstractModule {
         bind( Keyspace.class ).toProvider( AstynaxKeyspaceProvider.class );
 
         //bind our migration manager
-        bind(MigrationManager.class).to( MigrationManagerImpl.class );
+        bind( MigrationManager.class ).to( MigrationManagerImpl.class );
 
 
         //bind the serialization strategies
@@ -61,4 +64,27 @@ public class CollectionModule extends AbstractModule {
         uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class );
         uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class );
     }
+
+
+    /** Wire the pipeline of operations for create.  This should create a new
+     * instance every time, since StagePipeline objects are mutable */
+    @Provides
+    @CreatePipeline
+    public StagePipeline createWritePipeline() {
+        return null;  //To change body of created methods use File | Settings | File Templates.
+    }
+
+
+    @Provides
+    @DeletePipeline
+    public StagePipeline deletePipeline() {
+        return null;  //To change body of created methods use File | Settings | File Templates.
+    }
+
+
+    @Provides
+    @UpdatePipeline
+    public StagePipeline updatePipeline() {
+        return null;  //To change body of created methods use File | Settings | File Templates.
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
deleted file mode 100644
index 019c497..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
- */
-public class Commit implements WriteStage {
-
-
-
-    @Override
-    public MutationBatch performStage( final MvccEntity entity ) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
new file mode 100644
index 0000000..30b274c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -0,0 +1,35 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+/**
+ * Pipeline that represents a collection of stages to execute
+ * An implementation is mutable, so this instance should not be
+ * reused across threads
+ *
+ * @author tnine */
+public interface StagePipeline {
+
+
+    /**
+     * Get the next stage in this pipeline.  Will return null if there are no more stages to execute
+     */
+    public WriteStage next();
+
+    /**
+     * Insert a new stage directly after the current stage.  This can be used
+     * to add additional validation during write phases depending on the mvcc entity
+     *
+     * @param stage
+     */
+    public void insert(WriteStage stage);
+
+
+    /**
+     * Add a new stage to the end of the pipline
+     * @param stage
+     */
+    public void addLast(WriteStage stage);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
deleted file mode 100644
index e66eb52..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class Start implements WriteStage {
-
-    private final MvccLogEntrySerializationStrategy logStrategy;
-    /**
-     * Create a new stage with the current context
-     * @param logStrategy
-     */
-    @Inject
-    protected Start( final MvccLogEntrySerializationStrategy logStrategy ){
-        this.logStrategy = logStrategy;
-    }
-
-
-    @Override
-    public MutationBatch performStage( final MvccEntity entity )  {
-        final MvccLogEntry startEntry = new MvccLogEntryImpl(entity.getContext(), entity.getUuid(), entity.getVersion(),Stage.ACTIVE);
-
-        MutationBatch write = logStrategy.write( startEntry );
-
-        return write;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
deleted file mode 100644
index 690416f..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * This phase should execute the serialization to the data store.
- */
-public class Write implements WriteStage {
-
-    /**
-     * Create a new stage with the current context
-     */
-    protected Write( ){
-    }
-
-
-    @Override
-    public MutationBatch performStage( final MvccEntity entity ) {
-
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
new file mode 100644
index 0000000..38aa9d4
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/** @author tnine */
+public interface WriteContext {
+
+
+    /**
+     * Get the stage pipeline for this write context
+     * @return
+     */
+    StagePipeline getStagePipeline();
+
+    /**
+     * Perform the write in the context with the specified entity
+     * @param entity
+     */
+    void performWrite(MvccEntity entity);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
deleted file mode 100644
index 4a78226..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/** @author tnine */
-public interface WriteListener extends ListenableFuture<OperationResult<Void>> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
index 1960fdb..a45df6b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
@@ -3,9 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
@@ -17,10 +14,12 @@ public interface WriteStage {
     /**
      * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
      *
+     *
+     * @param context The context of the current write operation
      * @param entity The entity to use in this stage
      *
      * @return The asynchronous listener to signal success
      *
      */
-    public MutationBatch performStage( MvccEntity entity ) throws ConnectionException;
+    public void performStage( WriteContext context, MvccEntity entity ) throws ConnectionException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
new file mode 100644
index 0000000..67ca516
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -0,0 +1,65 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
+import org.apache.usergrid.persistence.collection.guice.PropertyUtils;
+import org.apache.usergrid.persistence.collection.migration.Migration;
+import org.apache.usergrid.persistence.collection.migration.MigrationManager;
+import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategyImpl;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
+import com.netflix.astyanax.Keyspace;
+
+
+/**
+ * Simple module for wiring our pipelines
+ *
+ * @author tnine
+ */
+public class CollectionPipelineModule extends AbstractModule {
+
+    @Inject
+    private MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+
+    @Inject
+    private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+
+    /** Wire the pipeline of operations for create.  This should create a new
+     * instance every time, since StagePipeline objects are mutable */
+    @Provides
+    @CreatePipeline
+    public StagePipeline createWritePipeline() {
+        return StagePipelineImpl.fromStages( new Start( mvccLogEntrySerializationStrategy ), new Write(), new Commit() );
+    }
+
+
+    @Provides
+    @DeletePipeline
+    public StagePipeline deletePipeline() {
+        return null;  //To change body of created methods use File | Settings | File Templates.
+    }
+
+
+    @Provides
+    @UpdatePipeline
+    public StagePipeline updatePipeline() {
+        return createWritePipeline();
+    }
+
+
+    @Override
+    protected void configure() {
+        //no op, we get our values from the provides above
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
new file mode 100644
index 0000000..378e649
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
@@ -0,0 +1,24 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
+ */
+public class Commit implements WriteStage {
+
+    public Commit(){
+
+    }
+
+    @Override
+    public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
new file mode 100644
index 0000000..fb7e7c1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+
+/**
+ * Marks the create pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CreatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
new file mode 100644
index 0000000..e531a7f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
new file mode 100644
index 0000000..5de58c5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -0,0 +1,59 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+
+/** @author tnine */
+public class StagePipelineImpl implements StagePipeline {
+
+    private int currentIndex = 0;
+    private final List<WriteStage> stages;
+
+    protected StagePipelineImpl(WriteStage[] stages){
+        this.stages = Arrays.asList(stages);
+    }
+
+    @Override
+    public WriteStage next() {
+
+        if(currentIndex < stages.size()){
+
+            //get our current stage and increment
+            return stages.get( currentIndex ++);
+        }
+
+       return null;
+    }
+
+
+    @Override
+    public void insert( final WriteStage stage ) {
+        throw new UnsupportedOperationException("This needs written");
+
+    }
+
+
+    @Override
+    public void addLast( final WriteStage stage ) {
+       stages.add( stage );
+    }
+
+
+    /**
+     * Factory to create a new instance.
+     * @param stages
+     * @return
+     */
+    public static StagePipelineImpl fromStages(WriteStage... stages){
+        return new StagePipelineImpl( stages );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
new file mode 100644
index 0000000..6d4b63c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
@@ -0,0 +1,66 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class Start implements WriteStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( Start.class );
+
+    private final MvccLogEntrySerializationStrategy logStrategy;
+    /**
+     * Create a new stage with the current context
+     * @param logStrategy
+     */
+    @Inject
+    public Start( final MvccLogEntrySerializationStrategy logStrategy ){
+        this.logStrategy = logStrategy;
+    }
+
+
+    @Override
+    public void performStage(final WriteContext context, final MvccEntity entity )  {
+        final MvccLogEntry startEntry = new MvccLogEntryImpl(entity.getContext(), entity.getUuid(), entity.getVersion(),Stage.ACTIVE);
+
+        MutationBatch write = logStrategy.write( startEntry );
+
+        ListenableFuture<OperationResult<Void>> future;
+
+        try {
+            future = write.executeAsync();
+        }
+        catch ( ConnectionException e ) {
+          LOG.error( "Failed to execute write asynchronously ", e );
+          throw new RuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+        //todo next stage invocation
+        //Futures.addCallback();
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
new file mode 100644
index 0000000..556e92c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the update pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface UpdatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
new file mode 100644
index 0000000..b45ccd5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
@@ -0,0 +1,27 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+
+/**
+ * This phase should execute the serialization to the data store.
+ */
+public class Write implements WriteStage {
+
+    /**
+     * Create a new stage with the current context
+     */
+    public Write( ){
+    }
+
+
+    @Override
+    public void performStage(WriteContext context, final MvccEntity entity ) {
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
index 76d6326..aa79a70 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
@@ -5,9 +5,9 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.CollectionContextImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.Start;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
index 17903fe..cc0bc35 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
@@ -2,12 +2,9 @@ package org.apache.usergrid.persistence.index.stage;
 
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
 
 /**
  *
@@ -18,8 +15,8 @@ public class Complete implements WriteStage
 {
 
     @Override
-    public MutationBatch performStage( final MvccEntity entity ) {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public void performStage( WriteContext context, final MvccEntity entity ) {
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
index 4270629..d97b81c 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
@@ -2,19 +2,19 @@ package org.apache.usergrid.persistence.index.stage;
 
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /** This state should signal an index update has started */
 public class Start implements WriteStage
 {
 
+
     @Override
-    public MutationBatch performStage( final MvccEntity entity ) {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
index c5d857b..e143a6b 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
@@ -2,20 +2,20 @@ package org.apache.usergrid.persistence.index.stage;
 
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /** This state should perform an update of the index. */
 public class Write implements WriteStage
 {
 
+
     @Override
-    public MutationBatch performStage( final MvccEntity entity ) {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 }