You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/05 01:55:30 UTC
[4/6] git commit: Working deletes. Need to add more test cases
Working deletes. Need to add more test cases
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/43aba652
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/43aba652
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/43aba652
Branch: refs/heads/two-dot-o
Commit: 43aba652475ea989071664017f2de8ffd872b5aa
Parents: 0e0c8ff
Author: Todd Nine <to...@apache.org>
Authored: Wed Dec 4 16:45:19 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Wed Dec 4 17:16:37 2013 -0700
----------------------------------------------------------------------
.../collection/impl/CollectionManagerImpl.java | 42 ++--
.../collection/mvcc/stage/ExecutionContext.java | 50 +++++
.../collection/mvcc/stage/Stage.java | 15 ++
.../collection/mvcc/stage/StagePipeline.java | 15 +-
.../collection/mvcc/stage/WriteContext.java | 50 -----
.../collection/mvcc/stage/WriteStage.java | 26 ---
.../collection/mvcc/stage/impl/Clear.java | 18 +-
.../stage/impl/CollectionPipelineModule.java | 32 ++-
.../collection/mvcc/stage/impl/Commit.java | 18 +-
.../collection/mvcc/stage/impl/Create.java | 16 +-
.../mvcc/stage/impl/ExecutionContextImpl.java | 92 ++++++++
.../collection/mvcc/stage/impl/Load.java | 82 +++++++
.../mvcc/stage/impl/LoadPipeline.java | 23 ++
.../mvcc/stage/impl/StagePipelineImpl.java | 36 +--
.../collection/mvcc/stage/impl/Start.java | 92 --------
.../collection/mvcc/stage/impl/StartDelete.java | 97 +++++++++
.../collection/mvcc/stage/impl/StartWrite.java | 92 ++++++++
.../collection/mvcc/stage/impl/Update.java | 16 +-
.../collection/mvcc/stage/impl/Verify.java | 10 +-
.../mvcc/stage/impl/WriteContextCallback.java | 8 +-
.../mvcc/stage/impl/WriteContextImpl.java | 93 --------
.../collection/CollectionManagerIT.java | 104 ++++++++-
.../collection/CollectionManagerTest.java | 15 +-
.../mvcc/stage/ExecutionContextTest.java | 217 +++++++++++++++++++
.../mvcc/stage/StagePipelineTest.java | 89 ++++++++
.../collection/mvcc/stage/WriteContextTest.java | 217 -------------------
.../collection/mvcc/stage/impl/CreateTest.java | 26 +--
.../collection/mvcc/stage/impl/StartTest.java | 187 ----------------
.../mvcc/stage/impl/StartWriteTest.java | 187 ++++++++++++++++
.../persistence/index/stage/Complete.java | 8 +-
.../usergrid/persistence/index/stage/Start.java | 8 +-
.../usergrid/persistence/index/stage/Write.java | 8 +-
stack/corepersistence/perftest/Readme.md | 4 +-
33 files changed, 1191 insertions(+), 802 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index ae16b3a..c20d2c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -8,12 +8,14 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.CollectionManager;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.LoadPipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.WriteContextImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.inject.Inject;
@@ -33,55 +35,67 @@ public class CollectionManagerImpl implements CollectionManager {
private final StagePipeline createPipeline;
private final StagePipeline updatePipeline;
private final StagePipeline deletePipeline;
+ private final StagePipeline loadPipeline;
@Inject
public CollectionManagerImpl( @CreatePipeline final StagePipeline createPipeline,
@UpdatePipeline final StagePipeline updatePipeline,
@DeletePipeline final StagePipeline deletePipeline,
- @Assisted final CollectionContext context) {
- this.context = context;
+ @LoadPipeline final StagePipeline loadPipeline,
+ @Assisted final CollectionContext context ) {
+
this.createPipeline = createPipeline;
this.updatePipeline = updatePipeline;
this.deletePipeline = deletePipeline;
+ this.loadPipeline = loadPipeline;
+ this.context = context;
}
@Override
public Entity create( final Entity entity ) {
// Create a new context for the write
- WriteContext writeContext = new WriteContextImpl( createPipeline, context );
+ ExecutionContext executionContext = new ExecutionContextImpl( createPipeline, context );
//perform the write
- writeContext.performWrite( entity );
+ executionContext.execute( entity );
- return writeContext.getMessage( Entity.class );
+ MvccEntity result = executionContext.getMessage( MvccEntity.class );
+
+ return result.getEntity().get();
}
@Override
public Entity update( final Entity entity ) {
// Create a new context for the write
- WriteContext writeContext = new WriteContextImpl( updatePipeline, context );
+ ExecutionContext executionContext = new ExecutionContextImpl( updatePipeline, context );
//perform the write
- writeContext.performWrite( entity );
+ executionContext.execute( entity );
+
+ MvccEntity result = executionContext.getMessage( MvccEntity.class );
- return writeContext.getMessage( Entity.class );
+ return result.getEntity().get();
}
@Override
public void delete( final UUID entityId ) {
- WriteContext deleteContext = new WriteContextImpl( deletePipeline, context );
-
- deleteContext.performWrite( entityId );
+ ExecutionContext deleteContext = new ExecutionContextImpl( deletePipeline, context );
+ deleteContext.execute( entityId );
}
@Override
public Entity load( final UUID entityId ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ ExecutionContext loadContext = new ExecutionContextImpl( loadPipeline, context );
+
+ loadContext.execute( entityId );
+
+ return loadContext.getMessage( Entity.class );
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
new file mode 100644
index 0000000..f124dcf
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
@@ -0,0 +1,50 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
+
+
+/** @author tnine */
+public interface ExecutionContext {
+
+ /**
+ * Perform the write in the context with the specified entity
+ * @param inputData The data to use to being the write
+ */
+ void execute( Object inputData );
+
+
+ /**
+ * Get the current message. If the message is not the right type at runtime, an assertion exception will be thrown
+ * @return
+ */
+ <T> T getMessage(Class<T> clazz);
+
+ /**
+ * Set the message into the write context
+ * @return
+ */
+ Object setMessage(Object object);
+
+
+ /**
+ * Signal that the next stage in the write should proceed
+ */
+ void proceed();
+
+
+
+ /**
+ * Return the current collection context
+ * @return
+ */
+ CollectionContext getCollectionContext();
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
new file mode 100644
index 0000000..538a546
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+/** The possible stages in our write flow. */
+public interface Stage {
+
+ /**
+ * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
+ *
+ * @param context The context of the current write operation
+ *
+ * @return The asynchronous listener to signal success
+ */
+ public void performStage( ExecutionContext context );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 7bf31ea..0b032d9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -11,23 +11,18 @@ public interface StagePipeline {
/**
- * Get the first stage in this pipeline. Will return null if there are no more stages to execute
+ * Get the first stage in this pipeline.
*/
- WriteStage first();
+ Stage first();
/**
- * get the next stage after this one
- * @param stage
+ * get the next stage after the stage specified
+ * @param stage The stage to seek in our pipeline
*/
- WriteStage nextStage(WriteStage stage);
+ Stage nextStage(Stage stage);
- /**
- * Get the current stage in the pipeline
- * @return
- */
- WriteStage current();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
deleted file mode 100644
index 98e0742..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
-
-
-/** @author tnine */
-public interface WriteContext {
-
- /**
- * Perform the write in the context with the specified entity
- * @param inputData The data to use to being the write
- */
- void performWrite(Object inputData);
-
-
- /**
- * Get the current message. If the message is not the right type at runtime, an assertion exception will be thrown
- * @return
- */
- <T> T getMessage(Class<T> clazz);
-
- /**
- * Set the message into the write context
- * @return
- */
- Object setMessage(Object object);
-
-
- /**
- * Signal that the next stage in the write should proceed
- */
- void proceed();
-
-
-
- /**
- * Return the current collection context
- * @return
- */
- CollectionContext getCollectionContext();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
deleted file mode 100644
index 3f80348..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * The possible stages in our write flow.
- */
-public interface WriteStage{
-
- /**
- * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
- *
- *
- * @param context The context of the current write operation
- *
- * @return The asynchronous listener to signal success
- *
- */
- public void performStage( WriteContext context);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
index 57a5e41..854ec24 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
@@ -10,10 +10,9 @@ import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
@@ -24,7 +23,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Clear implements WriteStage {
+public class Clear implements Stage {
private static final Logger LOG = LoggerFactory.getLogger( Clear.class );
@@ -47,8 +46,8 @@ public class Clear implements WriteStage {
@Override
- public void performStage( final WriteContext writeContext ) {
- final MvccEntity entity = writeContext.getMessage( MvccEntity.class );
+ public void performStage( final ExecutionContext executionContext ) {
+ final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -59,10 +58,11 @@ public class Clear implements WriteStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = writeContext.getCollectionContext();
+ final CollectionContext collectionContext = executionContext.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+ .collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -84,7 +84,7 @@ public class Clear implements WriteStage {
/**
* We're done executing.
*/
- writeContext.proceed();
+ executionContext.proceed();
//TODO connect to post processors via listener
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 2bd4596..8390e7b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,8 +1,8 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
@@ -27,9 +27,9 @@ public class CollectionPipelineModule extends AbstractModule {
@CreatePipeline
@Inject
@Singleton
- public StagePipeline createPipeline( final Create create, final Start start, final Verify write,
+ public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
final Commit commit ) {
- return StagePipelineImpl.fromStages( create, start, write, commit );
+ return StagePipelineImpl.fromStages( create, startWrite, write, commit );
}
@@ -37,9 +37,9 @@ public class CollectionPipelineModule extends AbstractModule {
@UpdatePipeline
@Inject
@Singleton
- public StagePipeline updatePipeline( final Update update, final Start start, final Verify write,
+ public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
final Commit commit ) {
- return StagePipelineImpl.fromStages( update, start, write, commit );
+ return StagePipelineImpl.fromStages( update, startWrite, write, commit );
}
@@ -47,8 +47,17 @@ public class CollectionPipelineModule extends AbstractModule {
@DeletePipeline
@Inject
@Singleton
- public StagePipeline deletePipeline( final Update update, final Start start, final Clear delete ) {
- return StagePipelineImpl.fromStages( update, start, delete );
+ public StagePipeline deletePipeline(final StartDelete startDelete, final Clear delete ) {
+ return StagePipelineImpl.fromStages(startDelete, delete );
+ }
+
+
+ @Provides
+ @LoadPipeline
+ @Inject
+ @Singleton
+ public StagePipeline deletePipeline( final Load load ) {
+ return StagePipelineImpl.fromStages( load );
}
@@ -58,16 +67,19 @@ public class CollectionPipelineModule extends AbstractModule {
/**
* Configure all stages here
*/
- Multibinder<WriteStage> stageBinder = Multibinder.newSetBinder( binder(), WriteStage.class );
+ Multibinder<Stage> stageBinder = Multibinder.newSetBinder( binder(), Stage.class );
+
+ //creation stages
stageBinder.addBinding().to( Create.class );
stageBinder.addBinding().to( Update.class );
- stageBinder.addBinding().to( Start.class );
+ stageBinder.addBinding().to( StartWrite.class );
stageBinder.addBinding().to( Verify.class );
stageBinder.addBinding().to( Commit.class );
stageBinder.addBinding().to( Clear.class );
-
+ //loading stages
+ stageBinder.addBinding().to(Load.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
index a2d7606..f29bf93 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
@@ -10,10 +10,9 @@ import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
@@ -24,7 +23,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements WriteStage {
+public class Commit implements Stage {
private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
@@ -46,8 +45,8 @@ public class Commit implements WriteStage {
@Override
- public void performStage( final WriteContext writeContext ) {
- final MvccEntity entity = writeContext.getMessage( MvccEntity.class );
+ public void performStage( final ExecutionContext executionContext ) {
+ final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -58,10 +57,11 @@ public class Commit implements WriteStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = writeContext.getCollectionContext();
+ final CollectionContext collectionContext = executionContext.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+ .collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -83,7 +83,7 @@ public class Commit implements WriteStage {
/**
* We're done executing.
*/
- writeContext.proceed();
+ executionContext.proceed();
//TODO connect to post processors via listener
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
index c096748..2026274 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
@@ -9,8 +9,8 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.util.Verify;
@@ -26,7 +26,7 @@ import com.google.inject.Singleton;
* present, and this should set the entityId, version, created, and updated dates
*/
@Singleton
-public class Create implements WriteStage {
+public class Create implements Stage {
private static final Logger LOG = LoggerFactory.getLogger( Create.class );
@@ -49,12 +49,12 @@ public class Create implements WriteStage {
/**
* Create the entity Id and inject it, as well as set the timestamp versions
*
- * @param writeContext The context of the current write operation
+ * @param executionContext The context of the current write operation
*/
@Override
- public void performStage( final WriteContext writeContext ) {
+ public void performStage( final ExecutionContext executionContext ) {
- final Entity entity = writeContext.getMessage( Entity.class );
+ final Entity entity = executionContext.getMessage( Entity.class );
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -79,7 +79,7 @@ public class Create implements WriteStage {
entity.setUpdated( created );
//set the updated entity for the next stage
- writeContext.setMessage( entity );
- writeContext.proceed();
+ executionContext.setMessage( entity );
+ executionContext.proceed();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
new file mode 100644
index 0000000..8139f3d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
@@ -0,0 +1,92 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+
+
+/** @author tnine */
+public class ExecutionContextImpl implements ExecutionContext {
+
+ private final StagePipeline pipeline;
+ private final CollectionContext context;
+
+ private Object message;
+ private Stage current;
+
+
+ @Inject
+ public ExecutionContextImpl( final StagePipeline pipeline, final CollectionContext context ) {
+ Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
+ Preconditions.checkNotNull( context, "context cannot be null" );
+
+ this.pipeline = pipeline;
+ this.context = context;
+ }
+
+
+ @Override
+ public void execute( Object input ) {
+
+ current = this.pipeline.first();
+
+ setMessage( input );
+
+ current.performStage( this );
+ }
+
+
+ @Override
+ public <T> T getMessage( final Class<T> clazz ) {
+ Preconditions.checkNotNull( clazz, "Class must be specified" );
+
+ if ( message == null ) {
+ return null;
+ }
+
+ if ( !clazz.isInstance( message ) ) {
+ throw new ClassCastException(
+ "Message must be an instance of class " + clazz + ". However it was of type '" + message.getClass()
+ + "'" );
+ }
+
+
+ return ( T ) message;
+ }
+
+
+ @Override
+ public Object setMessage( final Object object ) {
+ Object original = message;
+
+ this.message = object;
+
+ return original;
+ }
+
+
+ @Override
+ public void proceed() {
+ Stage next = this.pipeline.nextStage( current );
+
+ //Nothing to do
+ if ( next == null ) {
+ return;
+ }
+
+ current = next;
+ current.performStage( this );
+ }
+
+
+
+ @Override
+ public CollectionContext getCollectionContext() {
+ return this.context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
new file mode 100644
index 0000000..b0ac251
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
@@ -0,0 +1,82 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+
+
+/**
+ * This stage is a load stage to load a single entity
+ */
+public class Load implements Stage {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger( Load.class );
+
+ private final UUIDService uuidService;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+ @Inject
+ public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+ this.uuidService = uuidService;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ }
+
+
+ @Override
+ public void performStage( final ExecutionContext executionContext ) {
+ final UUID entityId = executionContext.getMessage( UUID.class );
+
+ Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
+
+
+ final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+ //generate a version that represents now
+ final UUID versionMax = uuidService.newTimeUUID();
+
+ List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
+
+ //nothing to do, we didn't get a result back
+ if(results.size() != 1){
+ executionContext.setMessage( null );
+ executionContext.proceed();
+ return;
+ }
+
+ final Optional<Entity> targetVersion = results.get(0).getEntity();
+
+ //this entity has been marked as cleared. The version exists, but does not have entity data
+ if(!targetVersion.isPresent()){
+
+ //TODO, a lazy async repair/cleanup here?
+
+ executionContext.setMessage( null );
+ executionContext.proceed();
+ return;
+ }
+
+ executionContext.setMessage( targetVersion.get() );
+ executionContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
new file mode 100644
index 0000000..f712b2c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface LoadPipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
index 271e5e5..74dc003 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -4,24 +4,28 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
import java.util.Arrays;
import java.util.List;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+import com.google.common.base.Preconditions;
/** @author tnine */
public class StagePipelineImpl implements StagePipeline {
- private final List<WriteStage> stages;
- private WriteStage current;
+ private final List<Stage> stages;
+
+ protected StagePipelineImpl( List<Stage> stages ) {
+ Preconditions.checkNotNull(stages, "stages is required");
+ Preconditions.checkArgument( stages.size() > 0, "stages must have more than 1 element" );
- protected StagePipelineImpl( WriteStage[] stages ) {
- this.stages = Arrays.asList( stages );
+ this.stages = stages;
}
@Override
- public WriteStage first() {
+ public Stage first() {
if ( stages.size() == 0 ) {
return null;
@@ -31,29 +35,27 @@ public class StagePipelineImpl implements StagePipeline {
}
- @Override
- public WriteStage current() {
- return current;
- }
+
@Override
- public WriteStage nextStage( final WriteStage stage ) {
+ public Stage nextStage( final Stage stage ) {
+
+ Preconditions.checkNotNull( stage, "Stage cannot be null" );
+
int index = stages.indexOf( stage );
//we're done, do nothing
- if ( index == stages.size() ) {
+ if ( index == stages.size() -1 ) {
return null;
}
- current = stages.get( index + 1 );
-
- return current;
+ return stages.get( index + 1 );
}
/** Factory to create a new instance. */
- public static StagePipelineImpl fromStages( WriteStage... stages ) {
- return new StagePipelineImpl( stages );
+ public static StagePipelineImpl fromStages( Stage... stages ) {
+ return new StagePipelineImpl(Arrays.asList( stages ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
deleted file mode 100644
index bf58e06..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class Start implements WriteStage {
-
- private static final Logger LOG = LoggerFactory.getLogger( Start.class );
-
- private final MvccLogEntrySerializationStrategy logStrategy;
-
-
- /** Create a new stage with the current context */
- @Inject
- public Start( final MvccLogEntrySerializationStrategy logStrategy ) {
- Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-
-
- this.logStrategy = logStrategy;
- }
-
-
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param writeContext The context of the current write operation
- */
- @Override
- public void performStage( final WriteContext writeContext ) {
-
- final Entity entity = writeContext.getMessage( Entity.class );
-
- Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
- final UUID entityId = entity.getUuid();
- final UUID version = entity.getVersion();
-
- Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
- Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-
- final CollectionContext collectionContext = writeContext.getCollectionContext();
-
-
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.ACTIVE );
-
- MutationBatch write = logStrategy.write( collectionContext, startEntry );
-
-
- try {
- write.execute();
- }
- catch ( ConnectionException e ) {
- LOG.error( "Failed to execute write asynchronously ", e );
- throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
- }
-
-
- //create the mvcc entity for the next stage
- final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
-
- writeContext.setMessage( nextStage );
- writeContext.proceed();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
new file mode 100644
index 0000000..383ee9e
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
@@ -0,0 +1,97 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartDelete implements Stage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
+
+ private final MvccLogEntrySerializationStrategy logStrategy;
+ private final UUIDService uuidService;
+
+
+ /** Create a new stage with the current context */
+ @Inject
+ public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
+
+ Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+ Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+ this.logStrategy = logStrategy;
+ this.uuidService = uuidService;
+ }
+
+
+ /**
+ * Create the entity Id and inject it, as well as set the timestamp versions
+ *
+ * @param executionContext The context of the current write operation
+ */
+ @Override
+ public void performStage( final ExecutionContext executionContext ) {
+
+ final UUID entityId = executionContext.getMessage( UUID.class );
+
+
+ final UUID version = uuidService.newTimeUUID();
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+ final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+ .collection.mvcc.entity.Stage.ACTIVE );
+
+ MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+ try {
+ write.execute();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+
+ //create the mvcc entity for the next stage
+ final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
+
+ executionContext.setMessage( nextStage );
+ executionContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
new file mode 100644
index 0000000..4c55ae6
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
@@ -0,0 +1,92 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartWrite implements Stage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
+
+ private final MvccLogEntrySerializationStrategy logStrategy;
+
+
+ /** Create a new stage with the current context */
+ @Inject
+ public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+ Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+
+
+ this.logStrategy = logStrategy;
+ }
+
+
+ /**
+ * Create the entity Id and inject it, as well as set the timestamp versions
+ *
+ * @param executionContext The context of the current write operation
+ */
+ @Override
+ public void performStage( final ExecutionContext executionContext ) {
+
+ final Entity entity = executionContext.getMessage( Entity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+ final UUID entityId = entity.getUuid();
+ final UUID version = entity.getVersion();
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+ final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+ .collection.mvcc.entity.Stage.ACTIVE );
+
+ MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+ try {
+ write.execute();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+
+ //create the mvcc entity for the next stage
+ final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
+
+ executionContext.setMessage( nextStage );
+ executionContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
index 00764de..ce1be76 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
@@ -6,8 +6,8 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -22,7 +22,7 @@ import com.google.inject.Singleton;
* been set correctly
*/
@Singleton
-public class Update implements WriteStage {
+public class Update implements Stage {
private static final Logger LOG = LoggerFactory.getLogger( Update.class );
@@ -43,12 +43,12 @@ public class Update implements WriteStage {
/**
* Create the entity Id and inject it, as well as set the timestamp versions
*
- * @param writeContext The context of the current write operation
+ * @param executionContext The context of the current write operation
*/
@Override
- public void performStage( final WriteContext writeContext ) {
+ public void performStage( final ExecutionContext executionContext ) {
- final Entity entity = writeContext.getMessage( Entity.class );
+ final Entity entity = executionContext.getMessage( Entity.class );
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -60,7 +60,7 @@ public class Update implements WriteStage {
entity.setVersion( version );
entity.setUpdated( updated );
- writeContext.setMessage( entity );
- writeContext.proceed();
+ executionContext.setMessage( entity );
+ executionContext.proceed();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
index 7933266..cc53000 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
@@ -1,15 +1,15 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import com.google.inject.Singleton;
/** This phase should execute any verification on the MvccEntity */
@Singleton
-public class Verify implements WriteStage {
+public class Verify implements Stage {
public Verify() {
@@ -17,9 +17,9 @@ public class Verify implements WriteStage {
@Override
- public void performStage( final WriteContext writeContext ) {
+ public void performStage( final ExecutionContext executionContext ) {
//TODO no op for now, just continue to the next stage. Verification logic goes in here
- writeContext.proceed();
+ executionContext.proceed();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
index 65e5ba3..2a34e95 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
@@ -2,7 +2,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -18,11 +18,11 @@ import com.netflix.astyanax.connectionpool.OperationResult;
*/
public class WriteContextCallback implements FutureCallback<OperationResult<Void>> {
- private final WriteContext context;
+ private final ExecutionContext context;
/** Create a new callback. The data will be passed to the next stage */
- private WriteContextCallback( final WriteContext context ) {
+ private WriteContextCallback( final ExecutionContext context ) {
this.context = context;
}
@@ -52,7 +52,7 @@ public class WriteContextCallback implements FutureCallback<OperationResult<Void
* @param context The context to signal to continue in the callback
*/
public static void createCallback( final ListenableFuture<OperationResult<Void>> future,
- final WriteContext context ) {
+ final ExecutionContext context ) {
Futures.addCallback( future, new WriteContextCallback( context ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
deleted file mode 100644
index 0925f0b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/** @author tnine */
-public class WriteContextImpl implements WriteContext {
-
- private final StagePipeline pipeline;
- private final CollectionContext context;
-
- private Object message;
- private WriteStage current;
-
-
- @Inject
- public WriteContextImpl( final StagePipeline pipeline,
- final CollectionContext context ) {
- Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
- Preconditions.checkNotNull( context, "context cannot be null" );
-
- this.pipeline = pipeline;
- this.context = context;
- }
-
-
- @Override
- public void performWrite( Object input ) {
-
- current = this.pipeline.first();
-
- setMessage( input );
-
- current.performStage( this );
- }
-
-
- @Override
- public <T> T getMessage( final Class<T> clazz ) {
- Preconditions.checkNotNull( clazz, "Class must be specified" );
-
- if ( message == null ) {
- return null;
- }
-
- if ( !clazz.isInstance( message ) ) {
- throw new ClassCastException(
- "Message must be an instance of class " + clazz + ". However it was of type '" + message.getClass()
- + "'" );
- }
-
-
- return ( T ) message;
- }
-
-
- @Override
- public Object setMessage( final Object object ) {
- Object original = message;
-
- this.message = object;
-
- return original;
- }
-
-
- @Override
- public void proceed() {
- WriteStage next = this.pipeline.nextStage( current );
-
- //Nothing to do
- if ( next == null ) {
- return;
- }
-
- current = next;
- current.performStage( this );
- }
-
-
-
- @Override
- public CollectionContext getCollectionContext() {
- return this.context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
index b7c7027..9672a6b 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
@@ -5,9 +5,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.usergrid.persistence.collection.guice.CassandraTestCollectionModule;
-import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.IntegerField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.test.CassandraRule;
@@ -16,7 +16,8 @@ import com.google.inject.Inject;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/** @author tnine */
@@ -29,7 +30,6 @@ public class CollectionManagerIT {
public final CassandraRule rule = new CassandraRule();
-
@Inject
private CollectionManagerFactory factory;
@@ -37,18 +37,102 @@ public class CollectionManagerIT {
@Test
public void create() {
- CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test");
- Entity newEntity = new Entity("test");
+ CollectionContext context =
+ new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+ Entity newEntity = new Entity( "test" );
- CollectionManager manager = factory.createCollectionManager(context);
+ CollectionManager manager = factory.createCollectionManager( context );
Entity returned = manager.create( newEntity );
- assertNotNull("Returned has a uuid", returned.getUuid());
- assertEquals("Version matches uuid for create", returned.getUuid(), returned.getVersion());
+ assertNotNull( "Returned has a uuid", returned.getUuid() );
+ assertEquals( "Version matches uuid for create", returned.getUuid(), returned.getVersion() );
+
+ assertTrue( "Created time was set", returned.getCreated() > 0 );
+ assertEquals( "Created and updated time match on create", returned.getCreated(), returned.getUpdated() );
+ }
+
+
+ @Test
+ public void createAndLoad() {
+
+ CollectionContext context =
+ new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+ Entity newEntity = new Entity( "test" );
+
+ CollectionManager manager = factory.createCollectionManager( context );
+
+ Entity createReturned = manager.create( newEntity );
+
+
+ assertNotNull( "Id was assigned", createReturned.getUuid() );
+
+ Entity loadReturned = manager.load( createReturned.getUuid() );
+
+ assertEquals( "Same value", createReturned, loadReturned );
+ }
+
+
+ @Test
+ public void createLoadDelete() {
+
+ CollectionContext context =
+ new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+ Entity newEntity = new Entity( "test" );
+
+ CollectionManager manager = factory.createCollectionManager( context );
+
+ Entity createReturned = manager.create( newEntity );
+
+
+ assertNotNull( "Id was assigned", createReturned.getUuid() );
+
+ Entity loadReturned = manager.load( createReturned.getUuid() );
+
+ assertEquals( "Same value", createReturned, loadReturned );
+
+ manager.delete( createReturned.getUuid() );
+
+ loadReturned = manager.load( createReturned.getUuid() );
+
+ assertNull( "Entity was deleted", loadReturned );
+ }
+
+
+ @Test
+ public void createLoadUpdateLoad() {
+
+ CollectionContext context =
+ new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+
+ Entity newEntity = new Entity( "test" );
+ newEntity.setField( new IntegerField( "counter", 1 ) );
+
+ CollectionManager manager = factory.createCollectionManager( context );
+
+ Entity createReturned = manager.create( newEntity );
+
+
+ assertNotNull( "Id was assigned", createReturned.getUuid() );
+
+ Entity loadReturned = manager.load( createReturned.getUuid() );
+
+ assertEquals( "Same value", createReturned, loadReturned );
+
+
+ assertEquals("Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ));
+
+
+ //update the field to 2
+ createReturned.setField( new IntegerField( "counter", 2 ) );
+
+ manager.update( createReturned );
+
+ loadReturned = manager.load( createReturned.getUuid() );
+
+ assertEquals( "Same value", createReturned, loadReturned );
- assertTrue("Created time was set", returned.getCreated() > 0);
- assertEquals("Created and updated time match on create", returned.getCreated(), returned.getUpdated());
+ assertEquals("Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
index fca9044..9028cbf 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
@@ -6,9 +6,9 @@ import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
import org.apache.usergrid.persistence.collection.impl.CollectionManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -26,11 +26,14 @@ public class CollectionManagerTest {
@Test
public void create(){
- WriteStage mockStage = mock(WriteStage.class);
+ Stage mockStage = mock(Stage.class);
StagePipeline createPipeline = mock(StagePipeline.class);
StagePipeline updatePipeline = mock(StagePipeline.class);
StagePipeline deletePipeline = mock(StagePipeline.class);
+ StagePipeline loadPipeline = mock(StagePipeline.class);
+
+
//mock up returning the first stage
when(createPipeline.first()).thenReturn(mockStage);
@@ -38,7 +41,7 @@ public class CollectionManagerTest {
CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
- CollectionManager collectionManager = new CollectionManagerImpl(createPipeline, updatePipeline, deletePipeline, context);
+ CollectionManager collectionManager = new CollectionManagerImpl(createPipeline, updatePipeline, deletePipeline, loadPipeline, context);
Entity create = new Entity();
@@ -47,12 +50,12 @@ public class CollectionManagerTest {
//verify the first stage was asked for
verify(createPipeline).first();
- ArgumentCaptor<WriteContext> contextArg = ArgumentCaptor.forClass(WriteContext.class);
+ ArgumentCaptor<ExecutionContext> contextArg = ArgumentCaptor.forClass(ExecutionContext.class);
//verify the first perform stage was invoked
verify(mockStage).performStage( contextArg.capture() );
- //verify we set the passed entity into the WriteContext
+ //verify we set the passed entity into the ExecutionContext
assertEquals("Entity should be present in the write context", create, contextArg.getValue().getMessage( Entity.class ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
new file mode 100644
index 0000000..4b917e4
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
@@ -0,0 +1,217 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
+
+import static junit.framework.TestCase.assertSame;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/** @author tnine */
+public class ExecutionContextTest {
+
+ @Test
+ public void performWrite() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ Stage stage = mock( Stage.class );
+
+ when( pipeline.first() ).thenReturn( stage );
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ executionContext.execute( test );
+
+ //verify we called first in the pipeline to get the first value
+ verify( pipeline ).first();
+
+ //verify the first stage was invoked
+ verify( stage ).performStage( same( executionContext ) );
+
+ //verify the bean value was set
+ assertSame( test, executionContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void setAndGet() {
+ Object test = new Object();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ executionContext.setMessage( test );
+
+ assertSame( "Same value returned", test, executionContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void setAndGetTypeSafe() {
+ TestBean test = new TestBean();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ executionContext.setMessage( test );
+
+ //works because Test is an instance of object
+ assertSame( "Test instance of object", test, executionContext.getMessage( Object.class ) );
+
+ assertSame( "Test instance of object", test, executionContext.getMessage( TestBean.class ) );
+ }
+
+
+ @Test( expected = ClassCastException.class )
+ public void setAndGetBadType() {
+ Object test = new Object();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ executionContext.setMessage( test );
+
+ //works because Test is an instance of object
+ assertSame( "Test instance of object", test, executionContext.getMessage( Object.class ) );
+
+ //should blow up, not type save. The object test is not an instance of TestBean
+ executionContext.getMessage( TestBean.class );
+ }
+
+
+ @Test
+ public void nullMessage() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ executionContext.setMessage( null );
+
+ //works because Test is an instance of object
+ assertNull( "Null message returned", executionContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void proceedHasNextStep() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ Stage firstStage = mock( Stage.class );
+
+ Stage secondStage = mock( Stage.class );
+
+
+ when( pipeline.first() ).thenReturn( firstStage );
+
+ when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( secondStage );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ executionContext.execute( test );
+
+ //now proceed and validate we were called
+ executionContext.proceed();
+
+ verify( secondStage ).performStage( same( executionContext ) );
+ }
+
+
+ @Test
+ public void proceedNoNextStep() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ Stage firstStage = mock( Stage.class );
+
+ when( pipeline.first() ).thenReturn( firstStage );
+
+ when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( null );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ executionContext.execute( test );
+
+ //now proceed and validate we were called
+ executionContext.proceed();
+ }
+
+
+ @Test
+ public void getContextCorrect() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
+
+ assertSame( "Collection context pointer correct", collectionContext, executionContext.getCollectionContext() );
+ }
+
+
+
+
+ @Test( expected = NullPointerException.class )
+ public void nullContextFails() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+
+ new ExecutionContextImpl( null, collectionContext );
+ }
+
+
+ @Test( expected = NullPointerException.class )
+ public void nullPipelineFails() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+
+ new ExecutionContextImpl( null, collectionContext );
+ }
+
+
+ private static class TestBean {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
new file mode 100644
index 0000000..6f0fa35
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
@@ -0,0 +1,89 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.StagePipelineImpl;
+
+import static junit.framework.TestCase.assertSame;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+
+/** @author tnine */
+public class StagePipelineTest {
+
+ @Test
+ public void oneStage() {
+ Stage first = mock( Stage.class );
+
+ StagePipeline pipeline = StagePipelineImpl.fromStages( first );
+
+ assertSame( "Correct stage returned", first, pipeline.first() );
+
+ Stage next = pipeline.nextStage( first );
+
+ assertNull( "No next stage", next );
+ }
+
+
+ @Test
+ public void threeStages() {
+ Stage first = mock( Stage.class );
+ Stage second = mock( Stage.class );
+ Stage third = mock( Stage.class );
+
+ StagePipeline pipeline = StagePipelineImpl.fromStages( first, second, third );
+
+ assertSame( "Correct stage returned", first, pipeline.first() );
+
+ Stage next = pipeline.nextStage( first );
+
+ assertSame( "Correct stage returned", second, next );
+
+ next = pipeline.nextStage( next );
+
+ assertSame( "Correct stage returned", third, next );
+
+ next = pipeline.nextStage( next );
+
+ assertNull( "No next stage", next );
+ }
+
+
+ /**
+ * Test seeking without calling .first() just to make sure there's no side effects
+ */
+ @Test
+ public void stageSeek() {
+ Stage first = mock( Stage.class );
+ Stage second = mock( Stage.class );
+ Stage third = mock( Stage.class );
+
+ StagePipeline pipeline = StagePipelineImpl.fromStages( first, second, third );
+
+
+ Stage next = pipeline.nextStage( second );
+
+ assertSame( "Correct stage returned", third, next );
+
+ next = pipeline.nextStage( next );
+
+ assertNull( "No next stage", next );
+ }
+
+
+ @Test( expected = NullPointerException.class )
+ public void invalidStageInput() {
+ Stage first = mock( Stage.class );
+
+ StagePipeline pipeline = StagePipelineImpl.fromStages( first );
+ pipeline.nextStage( null );
+ }
+
+
+ @Test( expected = IllegalArgumentException.class )
+ public void noStagesErrors() {
+ StagePipelineImpl.fromStages();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/43aba652/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
deleted file mode 100644
index 490b249..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.WriteContextImpl;
-
-import static junit.framework.TestCase.assertSame;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/** @author tnine */
-public class WriteContextTest {
-
- @Test
- public void performWrite() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- WriteStage stage = mock( WriteStage.class );
-
- when( pipeline.first() ).thenReturn( stage );
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- writeContext.performWrite( test );
-
- //verify we called first in the pipeline to get the first value
- verify( pipeline ).first();
-
- //verify the first stage was invoked
- verify( stage ).performStage( same( writeContext ) );
-
- //verify the bean value was set
- assertSame( test, writeContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void setAndGet() {
- Object test = new Object();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- writeContext.setMessage( test );
-
- assertSame( "Same value returned", test, writeContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void setAndGetTypeSafe() {
- TestBean test = new TestBean();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- writeContext.setMessage( test );
-
- //works because Test is an instance of object
- assertSame( "Test instance of object", test, writeContext.getMessage( Object.class ) );
-
- assertSame( "Test instance of object", test, writeContext.getMessage( TestBean.class ) );
- }
-
-
- @Test( expected = ClassCastException.class )
- public void setAndGetBadType() {
- Object test = new Object();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- writeContext.setMessage( test );
-
- //works because Test is an instance of object
- assertSame( "Test instance of object", test, writeContext.getMessage( Object.class ) );
-
- //should blow up, not type save. The object test is not an instance of TestBean
- writeContext.getMessage( TestBean.class );
- }
-
-
- @Test
- public void nullMessage() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- writeContext.setMessage( null );
-
- //works because Test is an instance of object
- assertNull( "Null message returned", writeContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void proceedHasNextStep() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- WriteStage firstStage = mock( WriteStage.class );
-
- WriteStage secondStage = mock( WriteStage.class );
-
-
- when( pipeline.first() ).thenReturn( firstStage );
-
- when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( secondStage );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- writeContext.performWrite( test );
-
- //now proceed and validate we were called
- writeContext.proceed();
-
- verify( secondStage ).performStage( same( writeContext ) );
- }
-
-
- @Test
- public void proceedNoNextStep() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- WriteStage firstStage = mock( WriteStage.class );
-
- when( pipeline.first() ).thenReturn( firstStage );
-
- when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( null );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- writeContext.performWrite( test );
-
- //now proceed and validate we were called
- writeContext.proceed();
- }
-
-
- @Test
- public void getContextCorrect() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
-
- assertSame( "Collection context pointer correct", collectionContext, writeContext.getCollectionContext() );
- }
-
-
-
-
- @Test( expected = NullPointerException.class )
- public void nullContextFails() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
-
- new WriteContextImpl( null, collectionContext );
- }
-
-
- @Test( expected = NullPointerException.class )
- public void nullPipelineFails() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
-
- new WriteContextImpl( null, collectionContext );
- }
-
-
- private static class TestBean {
-
- }
-}