You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/03 04:23:23 UTC
git commit: WIP. Need to finish creating pipelines etc before merging.
Updated Branches:
refs/heads/two-dot-o-async-hacks [created] b2833c39e
WIP. Need to finish creating pipelines etc before merging.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2833c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2833c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2833c39
Branch: refs/heads/two-dot-o-async-hacks
Commit: b2833c39ea46a2d4037671bda019d07ac69d9449
Parents: 04b9497
Author: Todd Nine <to...@apache.org>
Authored: Mon Dec 2 20:22:55 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Mon Dec 2 20:22:55 2013 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 2 +
.../collection/CollectionManagerImpl.java | 25 ++------
.../collection/guice/CollectionModule.java | 34 ++++++++--
.../collection/mvcc/stage/Commit.java | 22 -------
.../collection/mvcc/stage/StagePipeline.java | 35 +++++++++++
.../collection/mvcc/stage/Start.java | 44 -------------
.../collection/mvcc/stage/Write.java | 29 ---------
.../collection/mvcc/stage/WriteContext.java | 25 ++++++++
.../collection/mvcc/stage/WriteListener.java | 10 ---
.../collection/mvcc/stage/WriteStage.java | 7 +--
.../stage/impl/CollectionPipelineModule.java | 65 +++++++++++++++++++
.../collection/mvcc/stage/impl/Commit.java | 24 +++++++
.../mvcc/stage/impl/CreatePipeline.java | 20 ++++++
.../mvcc/stage/impl/DeletePipeline.java | 20 ++++++
.../mvcc/stage/impl/StagePipelineImpl.java | 59 +++++++++++++++++
.../collection/mvcc/stage/impl/Start.java | 66 ++++++++++++++++++++
.../mvcc/stage/impl/UpdatePipeline.java | 20 ++++++
.../collection/mvcc/stage/impl/Write.java | 27 ++++++++
.../collection/mvcc/stage/StartTest.java | 2 +-
.../persistence/index/stage/Complete.java | 9 +--
.../usergrid/persistence/index/stage/Start.java | 10 +--
.../usergrid/persistence/index/stage/Write.java | 10 +--
22 files changed, 416 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 7e48ea4..1a0f358 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -91,6 +91,8 @@
<version>${guice.version}</version>
</dependency>
+
+
<!-- Google Guice Integration Test Injectors -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
index 679a8cb..b5f0afb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
@@ -1,7 +1,6 @@
package org.apache.usergrid.persistence.collection;
-import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import org.slf4j.Logger;
@@ -10,15 +9,9 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Commit;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Start;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Write;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-
/**
* Simple implementation. Should perform
@@ -30,19 +23,12 @@ public class CollectionManagerImpl implements CollectionManager {
private final CollectionContext context;
private final TimeService timeService;
- private final Start startStage;
- private final Write writeStage;
- private final Commit commitStage;
- @Inject
- public CollectionManagerImpl( final CollectionContext context, final TimeService timeService, final Start startStage, final Write writeStage,
- final Commit commitStage ) {
+
+ public CollectionManagerImpl( final CollectionContext context, final TimeService timeService ) {
this.context = context;
this.timeService = timeService;
- this.startStage = startStage;
- this.writeStage = writeStage;
- this.commitStage = commitStage;
}
@@ -69,9 +55,7 @@ public class CollectionManagerImpl implements CollectionManager {
MvccEntityImpl mvccEntity = new MvccEntityImpl(context, entityId, version, entity );
- MutationBatch mutation = startStage.performStage( mvccEntity );
- writeStage.performStage( mvccEntity );
- commitStage.performStage( mvccEntity );
+
}
@@ -91,4 +75,7 @@ public class CollectionManagerImpl implements CollectionManager {
public Entity load( final UUID entityId ) {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index c83fe4f..e675e88 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -5,12 +5,17 @@ import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvide
import org.apache.usergrid.persistence.collection.migration.Migration;
import org.apache.usergrid.persistence.collection.migration.MigrationManager;
import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategyImpl;
import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import com.netflix.astyanax.Keyspace;
@@ -23,9 +28,7 @@ import com.netflix.astyanax.Keyspace;
*/
public class CollectionModule extends AbstractModule {
- /**
- * The location of the properties file
- */
+ /** The location of the properties file */
private static final String CASS_PROPS = "cassandra.properties";
@@ -44,7 +47,7 @@ public class CollectionModule extends AbstractModule {
bind( Keyspace.class ).toProvider( AstynaxKeyspaceProvider.class );
//bind our migration manager
- bind(MigrationManager.class).to( MigrationManagerImpl.class );
+ bind( MigrationManager.class ).to( MigrationManagerImpl.class );
//bind the serialization strategies
@@ -61,4 +64,27 @@ public class CollectionModule extends AbstractModule {
uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class );
uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class );
}
+
+
+ /** Wire the pipeline of operations for create. This should create a new
+ * instance every time, since StagePipeline objects are mutable */
+ @Provides
+ @CreatePipeline
+ public StagePipeline createWritePipeline() {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+
+ @Provides
+ @DeletePipeline
+ public StagePipeline deletePipeline() {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+
+ @Provides
+ @UpdatePipeline
+ public StagePipeline updatePipeline() {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
deleted file mode 100644
index 019c497..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
- */
-public class Commit implements WriteStage {
-
-
-
- @Override
- public MutationBatch performStage( final MvccEntity entity ) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
new file mode 100644
index 0000000..30b274c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -0,0 +1,35 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+/**
+ * Pipeline that represents a collection of stages to execute
+ * An implementation is mutable, so this instance should not be
+ * reused across threads
+ *
+ * @author tnine */
+public interface StagePipeline {
+
+
+ /**
+ * Get the next stage in this pipeline. Will return null if there are no more stages to execute
+ */
+ public WriteStage next();
+
+ /**
+ * Insert a new stage directly after the current stage. This can be used
+ * to add additional validation during write phases depending on the mvcc entity
+ *
+ * @param stage
+ */
+ public void insert(WriteStage stage);
+
+
+ /**
+ * Add a new stage to the end of the pipline
+ * @param stage
+ */
+ public void addLast(WriteStage stage);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
deleted file mode 100644
index e66eb52..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class Start implements WriteStage {
-
- private final MvccLogEntrySerializationStrategy logStrategy;
- /**
- * Create a new stage with the current context
- * @param logStrategy
- */
- @Inject
- protected Start( final MvccLogEntrySerializationStrategy logStrategy ){
- this.logStrategy = logStrategy;
- }
-
-
- @Override
- public MutationBatch performStage( final MvccEntity entity ) {
- final MvccLogEntry startEntry = new MvccLogEntryImpl(entity.getContext(), entity.getUuid(), entity.getVersion(),Stage.ACTIVE);
-
- MutationBatch write = logStrategy.write( startEntry );
-
- return write;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
deleted file mode 100644
index 690416f..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * This phase should execute the serialization to the data store.
- */
-public class Write implements WriteStage {
-
- /**
- * Create a new stage with the current context
- */
- protected Write( ){
- }
-
-
- @Override
- public MutationBatch performStage( final MvccEntity entity ) {
-
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
new file mode 100644
index 0000000..38aa9d4
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/** @author tnine */
+public interface WriteContext {
+
+
+ /**
+ * Get the stage pipeline for this write context
+ * @return
+ */
+ StagePipeline getStagePipeline();
+
+ /**
+ * Perform the write in the context with the specified entity
+ * @param entity
+ */
+ void performWrite(MvccEntity entity);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
deleted file mode 100644
index 4a78226..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/** @author tnine */
-public interface WriteListener extends ListenableFuture<OperationResult<Void>> {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
index 1960fdb..a45df6b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
@@ -3,9 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -17,10 +14,12 @@ public interface WriteStage {
/**
* Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
*
+ *
+ * @param context The context of the current write operation
* @param entity The entity to use in this stage
*
* @return The asynchronous listener to signal success
*
*/
- public MutationBatch performStage( MvccEntity entity ) throws ConnectionException;
+ public void performStage( WriteContext context, MvccEntity entity ) throws ConnectionException;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
new file mode 100644
index 0000000..67ca516
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -0,0 +1,65 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
+import org.apache.usergrid.persistence.collection.guice.PropertyUtils;
+import org.apache.usergrid.persistence.collection.migration.Migration;
+import org.apache.usergrid.persistence.collection.migration.MigrationManager;
+import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategyImpl;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
+import com.netflix.astyanax.Keyspace;
+
+
+/**
+ * Simple module for wiring our pipelines
+ *
+ * @author tnine
+ */
+public class CollectionPipelineModule extends AbstractModule {
+
+ @Inject
+ private MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+
+ @Inject
+ private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+
+ /** Wire the pipeline of operations for create. This should create a new
+ * instance every time, since StagePipeline objects are mutable */
+ @Provides
+ @CreatePipeline
+ public StagePipeline createWritePipeline() {
+ return StagePipelineImpl.fromStages( new Start( mvccLogEntrySerializationStrategy ), new Write(), new Commit() );
+ }
+
+
+ @Provides
+ @DeletePipeline
+ public StagePipeline deletePipeline() {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+
+ @Provides
+ @UpdatePipeline
+ public StagePipeline updatePipeline() {
+ return createWritePipeline();
+ }
+
+
+ @Override
+ protected void configure() {
+ //no op, we get our values from the provides above
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
new file mode 100644
index 0000000..378e649
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
@@ -0,0 +1,24 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
+ */
+public class Commit implements WriteStage {
+
+ public Commit(){
+
+ }
+
+ @Override
+ public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
new file mode 100644
index 0000000..fb7e7c1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+
+/**
+ * Marks the create pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CreatePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
new file mode 100644
index 0000000..e531a7f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface DeletePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
new file mode 100644
index 0000000..5de58c5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -0,0 +1,59 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+
+/** @author tnine */
+public class StagePipelineImpl implements StagePipeline {
+
+ private int currentIndex = 0;
+ private final List<WriteStage> stages;
+
+ protected StagePipelineImpl(WriteStage[] stages){
+ this.stages = Arrays.asList(stages);
+ }
+
+ @Override
+ public WriteStage next() {
+
+ if(currentIndex < stages.size()){
+
+ //get our current stage and increment
+ return stages.get( currentIndex ++);
+ }
+
+ return null;
+ }
+
+
+ @Override
+ public void insert( final WriteStage stage ) {
+ throw new UnsupportedOperationException("This needs written");
+
+ }
+
+
+ @Override
+ public void addLast( final WriteStage stage ) {
+ stages.add( stage );
+ }
+
+
+ /**
+ * Factory to create a new instance.
+ * @param stages
+ * @return
+ */
+ public static StagePipelineImpl fromStages(WriteStage... stages){
+ return new StagePipelineImpl( stages );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
new file mode 100644
index 0000000..6d4b63c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
@@ -0,0 +1,66 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class Start implements WriteStage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( Start.class );
+
+ private final MvccLogEntrySerializationStrategy logStrategy;
+ /**
+ * Create a new stage with the current context
+ * @param logStrategy
+ */
+ @Inject
+ public Start( final MvccLogEntrySerializationStrategy logStrategy ){
+ this.logStrategy = logStrategy;
+ }
+
+
+ @Override
+ public void performStage(final WriteContext context, final MvccEntity entity ) {
+ final MvccLogEntry startEntry = new MvccLogEntryImpl(entity.getContext(), entity.getUuid(), entity.getVersion(),Stage.ACTIVE);
+
+ MutationBatch write = logStrategy.write( startEntry );
+
+ ListenableFuture<OperationResult<Void>> future;
+
+ try {
+ future = write.executeAsync();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new RuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+ //todo next stage invocation
+ //Futures.addCallback();
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
new file mode 100644
index 0000000..556e92c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the update pipeline
+ * @author tnine */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface UpdatePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
new file mode 100644
index 0000000..b45ccd5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Write.java
@@ -0,0 +1,27 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+
+/**
+ * This phase should execute the serialization to the data store.
+ */
+public class Write implements WriteStage {
+
+ /**
+ * Create a new stage with the current context
+ */
+ public Write( ){
+ }
+
+
+ @Override
+ public void performStage(WriteContext context, final MvccEntity entity ) {
+
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
index 76d6326..aa79a70 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
@@ -5,9 +5,9 @@ import java.util.UUID;
import org.junit.Test;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.CollectionContextImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.Start;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
index 17903fe..cc0bc35 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
@@ -2,12 +2,9 @@ package org.apache.usergrid.persistence.index.stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
/**
*
@@ -18,8 +15,8 @@ public class Complete implements WriteStage
{
@Override
- public MutationBatch performStage( final MvccEntity entity ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ public void performStage( WriteContext context, final MvccEntity entity ) {
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
index 4270629..d97b81c 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
@@ -2,19 +2,19 @@ package org.apache.usergrid.persistence.index.stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This state should signal an index update has started */
public class Start implements WriteStage
{
+
@Override
- public MutationBatch performStage( final MvccEntity entity ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+ //To change body of implemented methods use File | Settings | File Templates.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2833c39/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
index c5d857b..e143a6b 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
@@ -2,20 +2,20 @@ package org.apache.usergrid.persistence.index.stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This state should perform an update of the index. */
public class Write implements WriteStage
{
+
@Override
- public MutationBatch performStage( final MvccEntity entity ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ public void performStage( final WriteContext context, final MvccEntity entity ) throws ConnectionException {
+ //To change body of implemented methods use File | Settings | File Templates.
}
}