You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/05 01:55:28 UTC
[2/6] git commit: Pipeline tests and wiring complete
Pipeline tests and wiring complete
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0e0c8ff7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0e0c8ff7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0e0c8ff7
Branch: refs/heads/two-dot-o
Commit: 0e0c8ff76c3d4dbebd06cb0e33441d3c3831d923
Parents: cb9ec94
Author: Todd Nine <to...@apache.org>
Authored: Wed Dec 4 14:59:51 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Wed Dec 4 16:25:57 2013 -0700
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 12 +
.../impl/CollectionManagerFactoryImpl.java | 21 --
.../collection/impl/CollectionManagerImpl.java | 38 ++--
.../collection/mvcc/stage/StagePipeline.java | 22 --
.../collection/mvcc/stage/WriteContext.java | 17 --
.../mvcc/stage/WriteContextFactory.java | 30 ---
.../collection/mvcc/stage/impl/Clear.java | 91 ++++++++
.../stage/impl/CollectionPipelineModule.java | 47 ++--
.../collection/mvcc/stage/impl/Commit.java | 90 ++++++++
.../collection/mvcc/stage/impl/Create.java | 85 ++++++++
.../mvcc/stage/impl/CreatePipeline.java | 7 +-
.../mvcc/stage/impl/DeletePipeline.java | 7 +-
.../mvcc/stage/impl/MvccEntityCommit.java | 28 ---
.../mvcc/stage/impl/MvccEntityNew.java | 116 ----------
.../mvcc/stage/impl/MvccEntityWrite.java | 25 ---
.../mvcc/stage/impl/StagePipelineImpl.java | 45 +---
.../collection/mvcc/stage/impl/Start.java | 92 ++++++++
.../collection/mvcc/stage/impl/Update.java | 66 ++++++
.../mvcc/stage/impl/UpdatePipeline.java | 23 ++
.../collection/mvcc/stage/impl/Verify.java | 25 +++
.../mvcc/stage/impl/WriteContextCallback.java | 23 +-
.../stage/impl/WriteContextFactoryImpl.java | 47 ----
.../mvcc/stage/impl/WriteContextImpl.java | 30 +--
.../persistence/collection/util/Verify.java | 21 ++
.../CollectionManagerFactoryTest.java | 46 +++-
.../collection/CollectionManagerIT.java | 54 +++++
.../collection/CollectionManagerTest.java | 59 +++++
.../guice/CassandraTestCollectionModule.java | 76 +++++++
.../collection/guice/TestCollectionModule.java | 52 ++---
.../collection/mvcc/stage/WriteContextTest.java | 217 +++++++++++++++++++
.../collection/mvcc/stage/impl/CreateTest.java | 152 +++++++++++++
.../mvcc/stage/impl/MvccEntityNewTest.java | 136 ------------
.../collection/mvcc/stage/impl/StartTest.java | 187 ++++++++++++++++
...MvccEntitySerializationStrategyImplTest.java | 4 +-
...ccLogEntrySerializationStrategyImplTest.java | 29 ++-
35 files changed, 1417 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 80f48d7..293e79a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -3,12 +3,16 @@ package org.apache.usergrid.persistence.collection.guice;
import java.util.Properties;
+import org.apache.usergrid.persistence.collection.CollectionManager;
+import org.apache.usergrid.persistence.collection.CollectionManagerFactory;
import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
+import org.apache.usergrid.persistence.collection.impl.CollectionManagerImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CollectionPipelineModule;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
@@ -54,5 +58,13 @@ public class CollectionModule extends AbstractModule {
install( new SerializationModule());
install (new ServiceModule());
+
+ //install the core services
+
+ //create a guice factor for getting our collection manager
+ install(new FactoryModuleBuilder()
+ .implement( CollectionManager.class, CollectionManagerImpl.class )
+ .build( CollectionManagerFactory.class ));
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerFactoryImpl.java
deleted file mode 100644
index 2ab0761..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerFactoryImpl.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.CollectionManager;
-import org.apache.usergrid.persistence.collection.CollectionManagerFactory;
-
-
-/**
- * Basic Imple
- * @author tnine
- */
-public class CollectionManagerFactoryImpl implements CollectionManagerFactory {
-
-
- @Override
- public CollectionManager createCollectionManager( final CollectionContext context ) {
-// return new CollectionManagerImpl( context );
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index 72ed6ce..ae16b3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -8,9 +8,12 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.CollectionManager;
-import org.apache.usergrid.persistence.collection.service.TimeService;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContextFactory;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.WriteContextImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.inject.Inject;
@@ -27,46 +30,53 @@ public class CollectionManagerImpl implements CollectionManager {
private static final Logger logger = LoggerFactory.getLogger( CollectionManagerImpl.class );
private final CollectionContext context;
- private final TimeService timeService;
- private final WriteContextFactory factory;
+ private final StagePipeline createPipeline;
+ private final StagePipeline updatePipeline;
+ private final StagePipeline deletePipeline;
@Inject
- public CollectionManagerImpl( final TimeService timeService, final WriteContextFactory factory,
- @Assisted final CollectionContext context ) {
+ public CollectionManagerImpl( @CreatePipeline final StagePipeline createPipeline,
+ @UpdatePipeline final StagePipeline updatePipeline,
+ @DeletePipeline final StagePipeline deletePipeline,
+ @Assisted final CollectionContext context) {
this.context = context;
- this.timeService = timeService;
- this.factory = factory;
+ this.createPipeline = createPipeline;
+ this.updatePipeline = updatePipeline;
+ this.deletePipeline = deletePipeline;
}
@Override
public Entity create( final Entity entity ) {
// Create a new context for the write
- WriteContext writeContext = factory.newCreateContext( context );
+ WriteContext writeContext = new WriteContextImpl( createPipeline, context );
//perform the write
writeContext.performWrite( entity );
- //TODO this shouldn't block, give a callback
return writeContext.getMessage( Entity.class );
-
}
@Override
public Entity update( final Entity entity ) {
- return null;
+ // Create a new context for the write
+ WriteContext writeContext = new WriteContextImpl( updatePipeline, context );
+
+ //perform the write
+ writeContext.performWrite( entity );
+
+ return writeContext.getMessage( Entity.class );
}
@Override
public void delete( final UUID entityId ) {
- WriteContext deleteContext = factory.newDeleteContext(context);
+ WriteContext deleteContext = new WriteContextImpl( deletePipeline, context );
deleteContext.performWrite( entityId );
- deleteContext.getMessage(Void.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 7e88915..7bf31ea 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -17,22 +17,6 @@ public interface StagePipeline {
/**
- * Insert a new stage directly after the current stage. This can be used
- * to add additional validation during write phases depending on the mvcc entity
- *
- * @param stage
- */
- void insert(WriteStage stage);
-
-
- /**
- * Add a new stage to the end of the pipline
- * @param stage
- */
- void addLast(WriteStage stage);
-
-
- /**
* get the next stage after this one
* @param stage
*/
@@ -40,12 +24,6 @@ public interface StagePipeline {
/**
- * Get the last stage in this pipeline
- * @return
- */
- WriteStage last();
-
- /**
* Get the current stage in the pipeline
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
index 4f5d82a..98e0742 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContext.java
@@ -12,13 +12,6 @@ import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener
/** @author tnine */
public interface WriteContext {
-
- /**
- * Get the stage pipeline for this write context
- * @return
- */
- StagePipeline getStagePipeline();
-
/**
* Perform the write in the context with the specified entity
* @param inputData The data to use to being the write
@@ -44,17 +37,7 @@ public interface WriteContext {
*/
void proceed();
- /**
- * Signal we should stop processing
- */
- void stop();
-
- /**
- * Add a post process listener to this write context
- * @return A list of all post proces
- */
- Collection<PostProcessListener> getPostProcessors();
/**
* Return the current collection context
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextFactory.java
deleted file mode 100644
index 99da0fb..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/** @author tnine */
-public interface WriteContextFactory {
-
- /**
- * Return a new write context for the given stage pipeline
- * @param
- * @return A write context that can be used for creating entities. Returns the new entity to use after
- * the write has completed
- */
- WriteContext newCreateContext(CollectionContext context);
-
- /**
- * Create a write context that cna be used for deleting entitie
- * @return
- */
- WriteContext newDeleteContext(CollectionContext context);
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
new file mode 100644
index 0000000..57a5e41
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
@@ -0,0 +1,91 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Clear implements WriteStage {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger( Clear.class );
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+ @Inject
+ public Clear( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+
+ Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ }
+
+
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+ final MvccEntity entity = writeContext.getMessage( MvccEntity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+ final UUID entityId = entity.getUuid();
+ final UUID version = entity.getVersion();
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+ final CollectionContext collectionContext = writeContext.getCollectionContext();
+
+
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
+
+ MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+ //insert a "cleared" value into the versions. Post processing should actually delete
+ MutationBatch entityMutation = entitySerializationStrategy.clear( collectionContext, entityId, version );
+
+ //merge the 2 into 1 mutation
+ logMutation.mergeShallow( entityMutation );
+
+
+ try {
+ logMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+ /**
+ * We're done executing.
+ */
+ writeContext.proceed();
+
+ //TODO connect to post processors via listener
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 16735ca..2bd4596 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,19 +1,13 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-import org.apache.usergrid.persistence.collection.migration.Migration;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyImpl;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
@@ -25,23 +19,38 @@ import com.google.inject.multibindings.Multibinder;
public class CollectionPipelineModule extends AbstractModule {
- /** Wire the pipeline of operations for create. This should create a new
- * instance every time, since StagePipeline objects are mutable */
+ /**
+ * Wire the pipeline of operations for create. This should create a new instance every time, since StagePipeline
+ * objects are mutable
+ */
@Provides
@CreatePipeline
@Inject
- public StagePipeline createWritePipeline(MvccEntityNew start, MvccEntityWrite write, MvccEntityCommit commit) {
- return StagePipelineImpl.fromStages(start, write, commit );
+ @Singleton
+ public StagePipeline createPipeline( final Create create, final Start start, final Verify write,
+ final Commit commit ) {
+ return StagePipelineImpl.fromStages( create, start, write, commit );
}
@Provides
- @DeletePipeline
- public StagePipeline deletePipeline() {
- return StagePipelineImpl.fromStages( );
+ @UpdatePipeline
+ @Inject
+ @Singleton
+ public StagePipeline updatePipeline( final Update update, final Start start, final Verify write,
+ final Commit commit ) {
+ return StagePipelineImpl.fromStages( update, start, write, commit );
}
+ @Provides
+ @DeletePipeline
+ @Inject
+ @Singleton
+ public StagePipeline deletePipeline( final Update update, final Start start, final Clear delete ) {
+ return StagePipelineImpl.fromStages( update, start, delete );
+ }
+
@Override
protected void configure() {
@@ -51,9 +60,13 @@ public class CollectionPipelineModule extends AbstractModule {
*/
Multibinder<WriteStage> stageBinder = Multibinder.newSetBinder( binder(), WriteStage.class );
- stageBinder.addBinding().to( MvccEntityNew.class );
- stageBinder.addBinding().to( MvccEntityWrite.class );
- stageBinder.addBinding().to( MvccEntityCommit.class );
+
+ stageBinder.addBinding().to( Create.class );
+ stageBinder.addBinding().to( Update.class );
+ stageBinder.addBinding().to( Start.class );
+ stageBinder.addBinding().to( Verify.class );
+ stageBinder.addBinding().to( Commit.class );
+ stageBinder.addBinding().to( Clear.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
new file mode 100644
index 0000000..a2d7606
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
@@ -0,0 +1,90 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Commit implements WriteStage {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+ @Inject
+ public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ }
+
+
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+ final MvccEntity entity = writeContext.getMessage( MvccEntity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+ final UUID entityId = entity.getUuid();
+ final UUID version = entity.getVersion();
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+ final CollectionContext collectionContext = writeContext.getCollectionContext();
+
+
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED );
+
+ MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+ //now get our actual insert into the entity data
+ MutationBatch entityMutation = entitySerializationStrategy.write( collectionContext, entity );
+
+ //merge the 2 into 1 mutation
+ logMutation.mergeShallow( entityMutation );
+
+
+ try {
+ logMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+ /**
+ * We're done executing.
+ */
+ writeContext.proceed();
+
+ //TODO connect to post processors via listener
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
new file mode 100644
index 0000000..c096748
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
@@ -0,0 +1,85 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.service.TimeService;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.collection.util.Verify;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a new entity create is started. No UUIDs should be
+ * present, and this should set the entityId, version, created, and updated dates
+ */
+@Singleton
+public class Create implements WriteStage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( Create.class );
+
+
+ private final TimeService timeService;
+ private final UUIDService uuidService;
+
+
+ @Inject
+ public Create( final TimeService timeService, final UUIDService uuidService ) {
+ Preconditions.checkNotNull( timeService, "timeService is required" );
+ Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+ this.timeService = timeService;
+ this.uuidService = uuidService;
+ }
+
+
+ /**
+ * Create the entity Id and inject it, as well as set the timestamp versions
+ *
+ * @param writeContext The context of the current write operation
+ */
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+
+ final Entity entity = writeContext.getMessage( Entity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+ Verify.isNull( entity.getUuid(), "A new entity should not have an id set. This is an update operation" );
+
+
+ final UUID entityId = uuidService.newTimeUUID();
+ final UUID version = entityId;
+ final long created = timeService.getTime();
+
+
+ try {
+ FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
+ }
+ catch ( Throwable t ) {
+ LOG.error( "Unable to set uuid. See nested exception", t );
+ throw new CollectionRuntimeException( "Unable to set uuid. See nested exception", t );
+ }
+
+ entity.setVersion( version );
+ entity.setCreated( created );
+ entity.setUpdated( created );
+
+ //set the updated entity for the next stage
+ writeContext.setMessage( entity );
+ writeContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
index e700940..efe50c8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
@@ -14,7 +14,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Marks the create pipeline
- * @author tnine */
+ *
+ * @author tnine
+ */
@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
public @interface CreatePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
index e531a7f..3d95ddb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
@@ -14,7 +14,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Marks the delete pipeline
- * @author tnine */
+ *
+ * @author tnine
+ */
@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
public @interface DeletePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityCommit.java
deleted file mode 100644
index 88bfca6..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityCommit.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
- */
-public class MvccEntityCommit implements WriteStage {
-
- public MvccEntityCommit(){
-
- }
-
-
- @Override
- public void performStage( final WriteContext context ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityNew.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityNew.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityNew.java
deleted file mode 100644
index 576394a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityNew.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.service.TimeService;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-public class MvccEntityNew implements WriteStage {
-
- private static final Logger LOG = LoggerFactory.getLogger( MvccEntityNew.class );
-
- private final MvccLogEntrySerializationStrategy logStrategy;
- private final TimeService timeService;
- private final UUIDService uuidService;
-
-
-
- /** Create a new stage with the current context */
- @Inject
- public MvccEntityNew( final MvccLogEntrySerializationStrategy logStrategy, final TimeService timeService,
- final UUIDService uuidService ) {
- this.logStrategy = logStrategy;
- this.timeService = timeService;
- this.uuidService = uuidService;
- }
-
-
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- * @param writeContext The context of the current write operation
- */
- @Override
- public void performStage( final WriteContext writeContext) {
-
- final Entity entity = writeContext.getMessage(Entity.class);
-
- Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-
- final UUID entityId = uuidService.newTimeUUID();
- final UUID version = entityId;
- final long created = timeService.getTime();
-
-
- try {
- FieldUtils.writeDeclaredField( entity, "uuid", entityId );
- }
- catch ( Throwable t ) {
- LOG.error( "Unable to set uuid. See nested exception", t );
- throw new CollectionRuntimeException( "Unable to set uuid. See nested exception", t );
- }
-
- entity.setVersion( version );
- entity.setCreated( created );
- entity.setUpdated( created );
-
- final CollectionContext collectionContext = writeContext.getCollectionContext();
-
-
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.ACTIVE );
-
- MutationBatch write = logStrategy.write(collectionContext, startEntry );
-
- ListenableFuture<OperationResult<Void>> future;
-
- try {
- future = write.executeAsync();
- }
- catch ( ConnectionException e ) {
- LOG.error( "Failed to execute write asynchronously ", e );
- throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
- }
-
- //create the mvcc entity for the next stage
- MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
-
- writeContext.setMessage( nextStage );
-
-
- //set the next stage to invoke on return
- WriteContextCallback.createCallback( future, writeContext );
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityWrite.java
deleted file mode 100644
index 0e795db..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/MvccEntityWrite.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
-
-
-/**
- * This phase should execute the serialization to the data store.
- */
-public class MvccEntityWrite implements WriteStage {
-
- /**
- * Create a new stage with the current context
- */
- public MvccEntityWrite( ){
- }
-
-
- @Override
- public void performStage( final WriteContext context ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
index 811b6eb..271e5e5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -5,7 +5,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
@@ -15,14 +14,16 @@ public class StagePipelineImpl implements StagePipeline {
private final List<WriteStage> stages;
private WriteStage current;
- protected StagePipelineImpl(WriteStage[] stages){
- this.stages = Arrays.asList(stages);
+
+ protected StagePipelineImpl( WriteStage[] stages ) {
+ this.stages = Arrays.asList( stages );
}
+
@Override
public WriteStage first() {
- if(stages.size() == 0){
+ if ( stages.size() == 0 ) {
return null;
}
@@ -31,56 +32,28 @@ public class StagePipelineImpl implements StagePipeline {
@Override
- public WriteStage last() {
- if(stages.size() == 0){
- return null;
- }
-
- return stages.get( stages.size()-1 );
- }
-
-
- @Override
public WriteStage current() {
return current;
}
@Override
- public void insert( final WriteStage stage ) {
- throw new UnsupportedOperationException("This needs implemented");
-
- }
-
-
- @Override
- public void addLast( final WriteStage stage ) {
- stages.add( stage );
- }
-
-
- @Override
public WriteStage nextStage( final WriteStage stage ) {
int index = stages.indexOf( stage );
//we're done, do nothing
- if(index == stages.size()){
+ if ( index == stages.size() ) {
return null;
}
- current = stages.get( index+1 );
+ current = stages.get( index + 1 );
return current;
}
-
- /**
- * Factory to create a new instance.
- * @param stages
- * @return
- */
- public static StagePipelineImpl fromStages(WriteStage... stages){
+ /** Factory to create a new instance. */
+ public static StagePipelineImpl fromStages( WriteStage... stages ) {
return new StagePipelineImpl( stages );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
new file mode 100644
index 0000000..bf58e06
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Start.java
@@ -0,0 +1,92 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class Start implements WriteStage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( Start.class );
+
+ private final MvccLogEntrySerializationStrategy logStrategy;
+
+
+ /** Create a new stage with the current context */
+ @Inject
+ public Start( final MvccLogEntrySerializationStrategy logStrategy ) {
+ Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+
+
+ this.logStrategy = logStrategy;
+ }
+
+
+ /**
+ * Create the entity Id and inject it, as well as set the timestamp versions
+ *
+ * @param writeContext The context of the current write operation
+ */
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+
+ final Entity entity = writeContext.getMessage( Entity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+ final UUID entityId = entity.getUuid();
+ final UUID version = entity.getVersion();
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+ final CollectionContext collectionContext = writeContext.getCollectionContext();
+
+
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.ACTIVE );
+
+ MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+ try {
+ write.execute();
+ }
+ catch ( ConnectionException e ) {
+ LOG.error( "Failed to execute write asynchronously ", e );
+ throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+ }
+
+
+ //create the mvcc entity for the next stage
+ final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
+
+ writeContext.setMessage( nextStage );
+ writeContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
new file mode 100644
index 0000000..00764de
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
@@ -0,0 +1,66 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.collection.service.TimeService;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This stage performs the initial commit log and write of an entity. It assumes the entity id and created has already
+ * been set correctly
+ */
+@Singleton
+public class Update implements WriteStage {
+
+ private static final Logger LOG = LoggerFactory.getLogger( Update.class );
+
+ private final TimeService timeService;
+ private final UUIDService uuidService;
+
+
+ @Inject
+ public Update( final TimeService timeService, final UUIDService uuidService ) {
+ Preconditions.checkNotNull( timeService, "timeService is required" );
+ Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+ this.timeService = timeService;
+ this.uuidService = uuidService;
+ }
+
+
+ /**
+ * Create the entity Id and inject it, as well as set the timestamp versions
+ *
+ * @param writeContext The context of the current write operation
+ */
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+
+ final Entity entity = writeContext.getMessage( Entity.class );
+
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+
+ final UUID version = uuidService.newTimeUUID();
+ final long updated = timeService.getTime();
+
+
+ entity.setVersion( version );
+ entity.setUpdated( updated );
+
+ writeContext.setMessage( entity );
+ writeContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
new file mode 100644
index 0000000..abc6e15
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the create pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface UpdatePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
new file mode 100644
index 0000000..7933266
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+
+import com.google.inject.Singleton;
+
+
+/** This phase should execute any verification on the MvccEntity */
+@Singleton
+public class Verify implements WriteStage {
+
+
+ public Verify() {
+ }
+
+
+ @Override
+ public void performStage( final WriteContext writeContext ) {
+ //TODO no op for now, just continue to the next stage. Verification logic goes in here
+
+ writeContext.proceed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
index e5ca9bd..65e5ba3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
@@ -2,9 +2,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -14,16 +12,16 @@ import com.netflix.astyanax.connectionpool.OperationResult;
/**
* Helper class to cause the async execution to continue
- * @author tnine */
+ * Not used ATM, just here for demonstration purposes with async astynax invocation on phase proceed
+ *
+ * @author tnine
+ */
public class WriteContextCallback implements FutureCallback<OperationResult<Void>> {
private final WriteContext context;
- /**
- * Create a new callback. The data will be passed to the next stage
- * @param context
- */
+ /** Create a new callback. The data will be passed to the next stage */
private WriteContextCallback( final WriteContext context ) {
this.context = context;
}
@@ -40,15 +38,15 @@ public class WriteContextCallback implements FutureCallback<OperationResult<Void
@Override
public void onFailure( final Throwable t ) {
- context.stop();
+// context.stop();
throw new CollectionRuntimeException( "Failed to execute write", t );
}
/**
- * This encapsulated type of Void in the listenable future is intentional. If you're not returning
- * void in your future, you shouldn't be using this callback, you should be using a callback
- * that will set the Response value into the next stage and invoke it
+ * This encapsulated type of Void in the listenable future is intentional. If you're not returning void in your
+ * future, you shouldn't be using this callback, you should be using a callback that will set the Response value
+ * into the next stage and invoke it
*
* @param future The listenable future returned by the Astyanax async op
* @param context The context to signal to continue in the callback
@@ -56,7 +54,6 @@ public class WriteContextCallback implements FutureCallback<OperationResult<Void
public static void createCallback( final ListenableFuture<OperationResult<Void>> future,
final WriteContext context ) {
- Futures.addCallback(future, new WriteContextCallback( context ));
-
+ Futures.addCallback( future, new WriteContextCallback( context ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextFactoryImpl.java
deleted file mode 100644
index f9b627f..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextFactoryImpl.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContextFactory;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/** @author tnine */
-@Singleton
-public class WriteContextFactoryImpl implements WriteContextFactory {
-
- private final StagePipeline writeStage;
- private final StagePipeline deleteStage;
- private final Collection<PostProcessListener> postProcessListener;
-
-
- @Inject
- public WriteContextFactoryImpl( @CreatePipeline final StagePipeline writeStage,
- @DeletePipeline final StagePipeline deleteStage,
- final Collection<PostProcessListener> postProcessListener ) {
- this.writeStage = writeStage;
- this.deleteStage = deleteStage;
- this.postProcessListener = postProcessListener;
- }
-
-
- @Override
- public WriteContext newCreateContext(CollectionContext context) {
- return new WriteContextImpl( postProcessListener, writeStage, context );
- }
-
-
- @Override
- public WriteContext newDeleteContext(CollectionContext context) {
- return new WriteContextImpl( postProcessListener, deleteStage, context );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
index cbddcd1..0925f0b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextImpl.java
@@ -1,10 +1,7 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-import java.util.Collection;
-
import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
@@ -16,7 +13,6 @@ import com.google.inject.Inject;
/** @author tnine */
public class WriteContextImpl implements WriteContext {
- private final Collection<PostProcessListener> listeners;
private final StagePipeline pipeline;
private final CollectionContext context;
@@ -25,25 +21,23 @@ public class WriteContextImpl implements WriteContext {
@Inject
- public WriteContextImpl( final Collection<PostProcessListener> listeners, final StagePipeline pipeline,
+ public WriteContextImpl( final StagePipeline pipeline,
final CollectionContext context ) {
- this.listeners = listeners;
+ Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
+ Preconditions.checkNotNull( context, "context cannot be null" );
+
this.pipeline = pipeline;
this.context = context;
}
@Override
- public StagePipeline getStagePipeline() {
- return this.pipeline;
- }
-
-
- @Override
public void performWrite( Object input ) {
current = this.pipeline.first();
+ setMessage( input );
+
current.performStage( this );
}
@@ -91,18 +85,6 @@ public class WriteContextImpl implements WriteContext {
}
- @Override
- public void stop() {
- //No op ATM
- current = null;
- }
-
-
- @Override
- public Collection<PostProcessListener> getPostProcessors() {
- return listeners;
- }
-
@Override
public CollectionContext getCollectionContext() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/Verify.java
new file mode 100644
index 0000000..98af421
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/Verify.java
@@ -0,0 +1,21 @@
+package org.apache.usergrid.persistence.collection.util;
+
+
+/**
+ * Class to help with input verification
+ */
+public class Verify {
+
+ /**
+ * Class to help with verification
+ * @param value
+ * @param message
+ */
+ public static void isNull(Object value, String message){
+ if(value != null){
+ throw new IllegalArgumentException(message );
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryTest.java
index 4450efe..355cd09 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryTest.java
@@ -1,8 +1,52 @@
package org.apache.usergrid.persistence.collection;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.guiceberry.junit4.GuiceBerryRule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+
+import static org.junit.Assert.assertNotNull;
+
+
/**
* Basic tests
+ *
* @author tnine
*/
-public class CollectionManagerFactoryTest {}
+public class CollectionManagerFactoryTest {
+
+
+ @Rule
+ public final GuiceBerryRule guiceBerry = new GuiceBerryRule( TestCollectionModule.class );
+
+
+ @Inject
+ private CollectionManagerFactory collectionManagerFactory;
+
+
+
+
+ @Test
+ public void validInput() {
+
+ CollectionContextImpl context =
+ new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+
+ CollectionManager collectionManager = collectionManagerFactory.createCollectionManager( context );
+
+ assertNotNull( "A collection manager must be returned", collectionManager );
+ }
+
+
+ @Test( expected = ProvisionException.class )
+ public void nullInput() {
+ CollectionManager collectionManager = collectionManagerFactory.createCollectionManager( null );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
new file mode 100644
index 0000000..b7c7027
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
@@ -0,0 +1,54 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.guice.CassandraTestCollectionModule;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.test.CassandraRule;
+
+import com.google.guiceberry.junit4.GuiceBerryRule;
+import com.google.inject.Inject;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
+
+
+/** @author tnine */
+public class CollectionManagerIT {
+ @Rule
+ public final GuiceBerryRule guiceBerry = new GuiceBerryRule( CassandraTestCollectionModule.class );
+
+
+ @Rule
+ public final CassandraRule rule = new CassandraRule();
+
+
+
+ @Inject
+ private CollectionManagerFactory factory;
+
+
+ @Test
+ public void create() {
+
+ CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test");
+ Entity newEntity = new Entity("test");
+
+ CollectionManager manager = factory.createCollectionManager(context);
+
+ Entity returned = manager.create( newEntity );
+
+ assertNotNull("Returned has a uuid", returned.getUuid());
+ assertEquals("Version matches uuid for create", returned.getUuid(), returned.getVersion());
+
+ assertTrue("Created time was set", returned.getCreated() > 0);
+ assertEquals("Created and updated time match on create", returned.getCreated(), returned.getUpdated());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
new file mode 100644
index 0000000..fca9044
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
@@ -0,0 +1,59 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
+import org.apache.usergrid.persistence.collection.impl.CollectionManagerImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/** @author tnine */
+public class CollectionManagerTest {
+
+ @Test
+ public void create(){
+
+ WriteStage mockStage = mock(WriteStage.class);
+
+ StagePipeline createPipeline = mock(StagePipeline.class);
+ StagePipeline updatePipeline = mock(StagePipeline.class);
+ StagePipeline deletePipeline = mock(StagePipeline.class);
+
+ //mock up returning the first stage
+ when(createPipeline.first()).thenReturn(mockStage);
+
+
+ CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+
+ CollectionManager collectionManager = new CollectionManagerImpl(createPipeline, updatePipeline, deletePipeline, context);
+
+ Entity create = new Entity();
+
+ Entity returned = collectionManager.create( create );
+
+ //verify the first stage was asked for
+ verify(createPipeline).first();
+
+ ArgumentCaptor<WriteContext> contextArg = ArgumentCaptor.forClass(WriteContext.class);
+
+ //verify the first perform stage was invoked
+ verify(mockStage).performStage( contextArg.capture() );
+
+ //verify we set the passed entity into the WriteContext
+ assertEquals("Entity should be present in the write context", create, contextArg.getValue().getMessage( Entity.class ));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/CassandraTestCollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/CassandraTestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/CassandraTestCollectionModule.java
new file mode 100644
index 0000000..b6ed30f
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/CassandraTestCollectionModule.java
@@ -0,0 +1,76 @@
+package org.apache.usergrid.persistence.collection.guice;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.locator.SimpleStrategy;
+
+import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
+import org.apache.usergrid.persistence.collection.migration.MigrationException;
+import org.apache.usergrid.persistence.collection.migration.MigrationManager;
+import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyImpl;
+import org.apache.usergrid.persistence.test.CassandraRule;
+
+import com.google.guiceberry.GuiceBerryEnvMain;
+import com.google.guiceberry.GuiceBerryModule;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.name.Names;
+
+
+/**
+ * Simple module for wiring our collection api
+ *
+ * @author tnine
+ */
+public class CassandraTestCollectionModule extends AbstractModule {
+
+ private final Map<String, String> overrides;
+
+ public CassandraTestCollectionModule( final Map<String, String> overrides ) {
+ this.overrides = overrides;
+ }
+
+
+
+
+ public CassandraTestCollectionModule() {
+ this.overrides = null;
+ }
+
+
+
+
+ @Override
+ protected void configure() {
+
+
+ //import the guice berry module
+ install( new TestCollectionModule(overrides) );
+
+
+ //now configure our db
+ bind( GuiceBerryEnvMain.class ).to( CassAppMain.class );
+ }
+
+
+
+ static class CassAppMain implements GuiceBerryEnvMain {
+
+ @Inject
+ protected MigrationManager migrationManager;
+
+
+ public void run() {
+ try {
+ //run the injected migration manager to set up cassandra
+ migrationManager.migrate();
+ }
+ catch ( MigrationException e ) {
+ throw new RuntimeException( e );
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index e8a16bd..44e6a26 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@ -7,28 +7,34 @@ import java.util.Map;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
-import org.apache.usergrid.persistence.collection.migration.MigrationException;
-import org.apache.usergrid.persistence.collection.migration.MigrationManager;
import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyImpl;
import org.apache.usergrid.persistence.test.CassandraRule;
-import com.google.guiceberry.GuiceBerryEnvMain;
import com.google.guiceberry.GuiceBerryModule;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.google.inject.name.Names;
/**
- * Simple module for wiring our collection api
+ * Module for testing our guice wiring environment is correct. Does not actually set the main execution env. Callers are
+ * responsible for that via decoration
*
* @author tnine
*/
public class TestCollectionModule extends AbstractModule {
+ private final Map<String, String> override;
+
+
+ public TestCollectionModule( Map<String, String> override ) {
+ this.override = override;
+ }
+
+
public TestCollectionModule() {
+ override = null;
}
@@ -39,9 +45,6 @@ public class TestCollectionModule extends AbstractModule {
//import the guice berry module
install( new GuiceBerryModule() );
- //now configure our db
- bind( GuiceBerryEnvMain.class ).to( CassAppMain.class );
-
//import the runtime module
install( new CollectionModule() );
@@ -65,41 +68,14 @@ public class TestCollectionModule extends AbstractModule {
/**
* Set the timeout to 60 seconds, no test should take that long for load+delete without a failure
*/
- configProperties.put( MvccLogEntrySerializationStrategyImpl.TIMEOUT_PROP, 60+"" );
+ configProperties.put( MvccLogEntrySerializationStrategyImpl.TIMEOUT_PROP, 60 + "" );
- Map<String, String> props = getOverrides();
- if(props != null){
- configProperties.putAll( props );
+ if(override != null){
+ configProperties.putAll( override );
}
//bind to the props
Names.bindProperties( binder(), configProperties );
}
-
-
- /**
- * Get any overrides we need for system properties
- */
- public Map<String, String> getOverrides() {
- return null;
- }
-
-
- static class CassAppMain implements GuiceBerryEnvMain {
-
- @Inject
- protected MigrationManager migrationManager;
-
-
- public void run() {
- try {
- //run the injected migration manager to set up cassandra
- migrationManager.migrate();
- }
- catch ( MigrationException e ) {
- throw new RuntimeException( e );
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0e0c8ff7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
new file mode 100644
index 0000000..490b249
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteContextTest.java
@@ -0,0 +1,217 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.WriteContextImpl;
+
+import static junit.framework.TestCase.assertSame;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/** @author tnine */
+public class WriteContextTest {
+
+ @Test
+ public void performWrite() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ WriteStage stage = mock( WriteStage.class );
+
+ when( pipeline.first() ).thenReturn( stage );
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ writeContext.performWrite( test );
+
+ //verify we called first in the pipeline to get the first value
+ verify( pipeline ).first();
+
+ //verify the first stage was invoked
+ verify( stage ).performStage( same( writeContext ) );
+
+ //verify the bean value was set
+ assertSame( test, writeContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void setAndGet() {
+ Object test = new Object();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ writeContext.setMessage( test );
+
+ assertSame( "Same value returned", test, writeContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void setAndGetTypeSafe() {
+ TestBean test = new TestBean();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ writeContext.setMessage( test );
+
+ //works because Test is an instance of object
+ assertSame( "Test instance of object", test, writeContext.getMessage( Object.class ) );
+
+ assertSame( "Test instance of object", test, writeContext.getMessage( TestBean.class ) );
+ }
+
+
+ @Test( expected = ClassCastException.class )
+ public void setAndGetBadType() {
+ Object test = new Object();
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ writeContext.setMessage( test );
+
+ //works because Test is an instance of object
+ assertSame( "Test instance of object", test, writeContext.getMessage( Object.class ) );
+
+ //should blow up, not type save. The object test is not an instance of TestBean
+ writeContext.getMessage( TestBean.class );
+ }
+
+
+ @Test
+ public void nullMessage() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ writeContext.setMessage( null );
+
+ //works because Test is an instance of object
+ assertNull( "Null message returned", writeContext.getMessage( Object.class ) );
+ }
+
+
+ @Test
+ public void proceedHasNextStep() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ WriteStage firstStage = mock( WriteStage.class );
+
+ WriteStage secondStage = mock( WriteStage.class );
+
+
+ when( pipeline.first() ).thenReturn( firstStage );
+
+ when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( secondStage );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ writeContext.performWrite( test );
+
+ //now proceed and validate we were called
+ writeContext.proceed();
+
+ verify( secondStage ).performStage( same( writeContext ) );
+ }
+
+
+ @Test
+ public void proceedNoNextStep() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+ WriteStage firstStage = mock( WriteStage.class );
+
+ when( pipeline.first() ).thenReturn( firstStage );
+
+ when( pipeline.nextStage( same( firstStage ) ) ).thenReturn( null );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ Object test = new Object();
+
+ writeContext.performWrite( test );
+
+ //now proceed and validate we were called
+ writeContext.proceed();
+ }
+
+
+ @Test
+ public void getContextCorrect() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+ StagePipeline pipeline = mock( StagePipeline.class );
+
+
+ WriteContext writeContext = new WriteContextImpl( pipeline, collectionContext );
+
+ assertSame( "Collection context pointer correct", collectionContext, writeContext.getCollectionContext() );
+ }
+
+
+
+
+ @Test( expected = NullPointerException.class )
+ public void nullContextFails() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+
+ new WriteContextImpl( null, collectionContext );
+ }
+
+
+ @Test( expected = NullPointerException.class )
+ public void nullPipelineFails() {
+
+ CollectionContext collectionContext = mock( CollectionContext.class );
+
+
+ new WriteContextImpl( null, collectionContext );
+ }
+
+
+ private static class TestBean {
+
+ }
+}