You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/05 05:17:33 UTC
[2/2] git commit: Example using google’s event bus. Seems events are getting dispatched multiple times.
Example using google’s event bus. Seems events are getting dispatched multiple times.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56c415f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56c415f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56c415f2
Branch: refs/heads/two-point-o-eventbus
Commit: 56c415f2bd6ac784b5e5d71a6f92bbbc11f82b86
Parents: 44072d5
Author: Todd Nine <to...@apache.org>
Authored: Wed Dec 4 21:14:42 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Wed Dec 4 21:14:42 2013 -0700
----------------------------------------------------------------------
.../collection/impl/CollectionManagerImpl.java | 61 +++---
.../mvcc/entity/CollectionEventBus.java | 38 ++++
.../collection/mvcc/entity/MvccEntity.java | 1 -
.../collection/mvcc/entity/MvccLogEntry.java | 2 -
.../entity/impl/CollectionEventBusImpl.java | 20 ++
.../mvcc/entity/impl/MvccEntityImpl.java | 1 -
.../mvcc/entity/impl/MvccLogEntryImpl.java | 1 -
.../mvcc/event/PostProcessListener.java | 3 -
.../collection/mvcc/stage/CollectionEvent.java | 43 ++++
.../collection/mvcc/stage/EventStage.java | 18 ++
.../collection/mvcc/stage/ExecutionContext.java | 5 -
.../collection/mvcc/stage/ExecutionStage.java | 15 --
.../collection/mvcc/stage/Result.java | 48 ++++
.../collection/mvcc/stage/StagePipeline.java | 8 +-
.../stage/impl/CollectionPipelineModule.java | 72 ++----
.../mvcc/stage/impl/ExecutionContextImpl.java | 92 --------
.../mvcc/stage/impl/StagePipelineImpl.java | 61 ------
.../mvcc/stage/impl/delete/Delete.java | 27 ++-
.../mvcc/stage/impl/delete/DeletePipeline.java | 23 --
.../mvcc/stage/impl/delete/DeleteStart.java | 17 ++
.../mvcc/stage/impl/delete/Deletecommit.java | 16 ++
.../mvcc/stage/impl/delete/StartDelete.java | 36 +--
.../mvcc/stage/impl/read/EventLoad.java | 17 ++
.../collection/mvcc/stage/impl/read/Load.java | 35 ++-
.../mvcc/stage/impl/read/PipelineLoad.java | 23 --
.../mvcc/stage/impl/write/Commit.java | 34 +--
.../mvcc/stage/impl/write/Create.java | 29 +--
.../mvcc/stage/impl/write/EventCommit.java | 16 ++
.../mvcc/stage/impl/write/EventCreate.java | 17 ++
.../mvcc/stage/impl/write/EventStart.java | 17 ++
.../mvcc/stage/impl/write/EventUpdate.java | 16 ++
.../mvcc/stage/impl/write/EventVerify.java | 17 ++
.../mvcc/stage/impl/write/PipelineCreate.java | 23 --
.../mvcc/stage/impl/write/PipelineUpdate.java | 23 --
.../mvcc/stage/impl/write/StartWrite.java | 40 ++--
.../mvcc/stage/impl/write/Update.java | 29 +--
.../mvcc/stage/impl/write/Verify.java | 20 +-
.../MvccEntitySerializationStrategy.java | 1 -
.../MvccLogEntrySerializationStrategyImpl.java | 2 +-
.../serialization/impl/SerializationModule.java | 2 -
.../collection/service/impl/ServiceModule.java | 10 -
.../collection/CollectionManagerIT.java | 8 +
.../collection/CollectionManagerTest.java | 83 ++++---
.../mvcc/stage/ExecutionContextTest.java | 217 -------------------
.../mvcc/stage/StagePipelineTest.java | 89 --------
.../mvcc/stage/impl/write/CreateTest.java | 77 ++++---
.../mvcc/stage/impl/write/StartWriteTest.java | 84 +++----
.../mvcc/stage/impl/write/UpdateTest.java | 98 +++++----
.../persistence/index/stage/Complete.java | 21 --
.../usergrid/persistence/index/stage/Start.java | 16 --
.../usergrid/persistence/index/stage/Write.java | 18 --
51 files changed, 659 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index e20ba62..7de8f50 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -8,16 +8,17 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.CollectionManager;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeleteStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.EventLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventUpdate;
import org.apache.usergrid.persistence.model.entity.Entity;
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -32,23 +33,16 @@ public class CollectionManagerImpl implements CollectionManager {
private static final Logger logger = LoggerFactory.getLogger( CollectionManagerImpl.class );
private final CollectionContext context;
- private final StagePipeline createPipeline;
- private final StagePipeline updatePipeline;
- private final StagePipeline deletePipeline;
- private final StagePipeline loadPipeline;
+ private final CollectionEventBus eventBus;
@Inject
- public CollectionManagerImpl( @PipelineCreate final StagePipeline createPipeline,
- @PipelineUpdate final StagePipeline updatePipeline,
- @DeletePipeline final StagePipeline deletePipeline,
- @PipelineLoad final StagePipeline loadPipeline,
+ public CollectionManagerImpl( final CollectionEventBus eventBus,
@Assisted final CollectionContext context ) {
- this.createPipeline = createPipeline;
- this.updatePipeline = updatePipeline;
- this.deletePipeline = deletePipeline;
- this.loadPipeline = loadPipeline;
+ Preconditions.checkNotNull( eventBus, "eventBus must be defined" );
+ Preconditions.checkNotNull( context, "context must be defined" );
+ this.eventBus = eventBus;
this.context = context;
}
@@ -56,46 +50,43 @@ public class CollectionManagerImpl implements CollectionManager {
@Override
public Entity create( final Entity entity ) {
// Create a new context for the write
- ExecutionContext executionContext = new ExecutionContextImpl( createPipeline, context );
+ Result result = new Result();
- //perform the write
- executionContext.execute( entity );
+ eventBus.post( new EventCreate( context, entity, result ) );
- MvccEntity result = executionContext.getMessage( MvccEntity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
- return result.getEntity().get();
+ return completed.getEntity().get();
}
@Override
public Entity update( final Entity entity ) {
// Create a new context for the write
- ExecutionContext executionContext = new ExecutionContextImpl( updatePipeline, context );
+ Result result = new Result();
- //perform the write
- executionContext.execute( entity );
+ eventBus.post( new EventUpdate( context, entity, result ) );
- MvccEntity result = executionContext.getMessage( MvccEntity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
- return result.getEntity().get();
+ return completed.getEntity().get();
}
@Override
public void delete( final UUID entityId ) {
- ExecutionContext deleteContext = new ExecutionContextImpl( deletePipeline, context );
-
- deleteContext.execute( entityId );
+ eventBus.post( new DeleteStart( context, entityId, null ) );
}
@Override
public Entity load( final UUID entityId ) {
- ExecutionContext loadContext = new ExecutionContextImpl( loadPipeline, context );
+ Result result = new Result();
- loadContext.execute( entityId );
+ eventBus.post( new EventLoad( context, entityId, result ) );
- return loadContext.getMessage( Entity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
+ return completed.getEntity().get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
new file mode 100644
index 0000000..c53f5ce
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
@@ -0,0 +1,38 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+/** A dup of the Guava EventBus so we can easily mock and test
+ * @author tnine */
+public interface CollectionEventBus {
+
+ /**
+ * Registers all handler methods on {@code object} to receive events.
+ * Handler methods are selected and classified using this EventBus's
+ * {@link com.google.common.eventbus.HandlerFindingStrategy}; the default strategy is the
+ * {@link com.google.common.eventbus.AnnotatedHandlerFinder}.
+ *
+ * @param object object whose handler methods should be registered.
+ */
+ void register(Object object);
+
+ /**
+ * Unregisters all handler methods on a registered {@code object}.
+ *
+ * @param object object whose handler methods should be unregistered.
+ * @throws IllegalArgumentException if the object was not previously registered.
+ */
+ void unregister(Object object);
+
+ /**
+ * Posts an event to all registered handlers. This method will return
+ * successfully after the event has been posted to all handlers, and
+ * regardless of any exceptions thrown by handlers.
+ *
+ * <p>If no handlers have been subscribed for {@code event}'s class, and
+ * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a
+ * DeadEvent and reposted.
+ *
+ * @param event event to post.
+ */
+ void post(Object event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
index cc212c8..59197e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
index c6886d1..d14b35c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
@@ -3,8 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
-
/**
* A Marker interface for an in flight update to allow context information to be passed between states
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
new file mode 100644
index 0000000..3278218
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+
+import com.google.common.eventbus.EventBus;
+
+
+/** @author tnine */
+public class CollectionEventBusImpl extends EventBus implements CollectionEventBus{
+
+ public CollectionEventBusImpl() {
+ super();
+ }
+
+
+ public CollectionEventBusImpl( final String identifier ) {
+ super( identifier );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
index 1179dad..2d9f2f9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.model.entity.Entity;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
index 2f0b4d7..dee56ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
index 29e6515..68479e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
@@ -1,9 +1,6 @@
package org.apache.usergrid.persistence.collection.mvcc.event;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
/**
*
* @author: tnine
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
new file mode 100644
index 0000000..84df2d0
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
@@ -0,0 +1,43 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+
+import com.google.common.base.Preconditions;
+
+
+/** @author tnine */
+public abstract class CollectionEvent<T> {
+
+ private final CollectionContext context;
+ private final T data;
+ private final Result result;
+
+
+ protected CollectionEvent( final CollectionContext context, final T data, final Result result ) {
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( data, "context is required" );
+ Preconditions.checkNotNull( context, "context is required" );
+ this.context = context;
+ this.data = data;
+ this.result = result;
+ }
+
+
+
+
+ /** Get the collection context for this event */
+ public CollectionContext getCollectionContext() {
+ return this.context;
+ }
+
+
+ public T getData() {
+ return data;
+ }
+
+
+ public Result getResult() {
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
new file mode 100644
index 0000000..c62eb36
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import com.google.common.eventbus.Subscribe;
+
+
+/** The possible stages in our write flow. */
+public interface EventStage<T extends CollectionEvent> {
+
+ /**
+ * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
+ *
+ * @param event The event to receive
+ *
+ */
+ @Subscribe
+ public void performStage(T event );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
index f124dcf..0fb9ced 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
@@ -1,12 +1,7 @@
package org.apache.usergrid.persistence.collection.mvcc.stage;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
/** @author tnine */
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
deleted file mode 100644
index a98c813..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-/** The possible stages in our write flow. */
-public interface ExecutionStage {
-
- /**
- * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
- *
- * @param context The context of the current write operation
- *
- * @return The asynchronous listener to signal success
- */
- public void performStage( ExecutionContext context );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
new file mode 100644
index 0000000..1887c71
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
@@ -0,0 +1,48 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/** @author tnine */
+public class Result {
+
+ private List<Object> results;
+
+
+ public Result() {
+ results = new ArrayList<Object>();
+ }
+
+
+ public List<Object> getResults() {
+ return results;
+ }
+
+
+ public void addResult( final Object result ) {
+ this.results.add( result );
+ }
+
+
+ /**
+ * Get the last occurrence of an instance that implements the type provided.
+ * @param clazz The class that the value should be an instance of
+ * @param <T> The type of class
+ * @return The value if one is found, null otherwise
+ */
+ public <T> T getLast( Class<T> clazz ) {
+
+ final int size = results.size();
+
+ for ( int i = size - 1; i > -1; i-- ) {
+ final Object value = results.get( i );
+ if ( clazz.isInstance( value ) ) {
+ return ( T ) value;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 9d68e10..33a2264 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -13,14 +13,14 @@ public interface StagePipeline {
/**
* Get the first stage in this pipeline.
*/
- ExecutionStage first();
+ EventStage first();
/**
- * get the next executionStage after the executionStage specified
- * @param executionStage The executionStage to seek in our pipeline
+ * get the next eventStage after the eventStage specified
+ * @param eventStage The eventStage to seek in our pipeline
*/
- ExecutionStage nextStage(ExecutionStage executionStage );
+ EventStage nextStage(EventStage eventStage );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 68dab72..7ef260b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,23 +1,19 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.CollectionEventBusImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.Delete;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.StartDelete;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.Load;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Commit;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Update;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Verify;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
@@ -31,45 +27,12 @@ import com.google.inject.multibindings.Multibinder;
public class CollectionPipelineModule extends AbstractModule {
- /**
- * Wire the pipeline of operations for create. This should create a new instance every time, since StagePipeline
- * objects are mutable
- */
@Provides
- @PipelineCreate
- @Inject
@Singleton
- public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
- final Commit commit ) {
- return StagePipelineImpl.fromStages( create, startWrite, write, commit );
- }
-
+ public CollectionEventBus eventBus(){
+ CollectionEventBus bus = new CollectionEventBusImpl( "collection" );
- @Provides
- @PipelineUpdate
- @Inject
- @Singleton
- public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
- final Commit commit ) {
- return StagePipelineImpl.fromStages( update, startWrite, write, commit );
- }
-
-
- @Provides
- @DeletePipeline
- @Inject
- @Singleton
- public StagePipeline deletePipeline(final StartDelete startDelete, final Delete delete ) {
- return StagePipelineImpl.fromStages(startDelete, delete );
- }
-
-
- @Provides
- @PipelineLoad
- @Inject
- @Singleton
- public StagePipeline deletePipeline( final Load load ) {
- return StagePipelineImpl.fromStages( load );
+ return bus;
}
@@ -79,22 +42,27 @@ public class CollectionPipelineModule extends AbstractModule {
/**
* Configure all stages here
*/
- Multibinder<ExecutionStage> stageBinder = Multibinder.newSetBinder( binder(), ExecutionStage.class );
+ Multibinder<EventStage> stageBinder = Multibinder.newSetBinder( binder(), EventStage.class );
+ /**
+ * Note we have to have the .asEagerSingleton(); or guice never loads these impls b/c they aren't
+ * directly referenced
+ */
//creation stages
- stageBinder.addBinding().to( Commit.class );
- stageBinder.addBinding().to( Create.class );
- stageBinder.addBinding().to( StartWrite.class );
- stageBinder.addBinding().to( Update.class );
- stageBinder.addBinding().to( Verify.class );
+ stageBinder.addBinding().to( Commit.class ).asEagerSingleton();
+ stageBinder.addBinding().to( Create.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( StartWrite.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( Update.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( Verify.class ).asEagerSingleton();;
//delete stages
- stageBinder.addBinding().to( Delete.class );
- stageBinder.addBinding().to( StartDelete.class );
+ stageBinder.addBinding().to( Delete.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( StartDelete.class ).asEagerSingleton();;
//loading stages
- stageBinder.addBinding().to(Load.class);
+ stageBinder.addBinding().to(Load.class).asEagerSingleton();;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
deleted file mode 100644
index 805d1e3..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/** @author tnine */
-public class ExecutionContextImpl implements ExecutionContext {
-
- private final StagePipeline pipeline;
- private final CollectionContext context;
-
- private Object message;
- private ExecutionStage current;
-
-
- @Inject
- public ExecutionContextImpl( final StagePipeline pipeline, final CollectionContext context ) {
- Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
- Preconditions.checkNotNull( context, "context cannot be null" );
-
- this.pipeline = pipeline;
- this.context = context;
- }
-
-
- @Override
- public void execute( Object input ) {
-
- current = this.pipeline.first();
-
- setMessage( input );
-
- current.performStage( this );
- }
-
-
- @Override
- public <T> T getMessage( final Class<T> clazz ) {
- Preconditions.checkNotNull( clazz, "Class must be specified" );
-
- if ( message == null ) {
- return null;
- }
-
- if ( !clazz.isInstance( message ) ) {
- throw new ClassCastException(
- "Message must be an instance of class " + clazz + ". However it was of type '" + message.getClass()
- + "'" );
- }
-
-
- return ( T ) message;
- }
-
-
- @Override
- public Object setMessage( final Object object ) {
- Object original = message;
-
- this.message = object;
-
- return original;
- }
-
-
- @Override
- public void proceed() {
- ExecutionStage next = this.pipeline.nextStage( current );
-
- //Nothing to do
- if ( next == null ) {
- return;
- }
-
- current = next;
- current.performStage( this );
- }
-
-
-
- @Override
- public CollectionContext getCollectionContext() {
- return this.context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
deleted file mode 100644
index 1f3a0fe..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-
-
-/** @author tnine */
-public class StagePipelineImpl implements StagePipeline {
-
- private final List<ExecutionStage> executionStages;
-
-
- protected StagePipelineImpl( List<ExecutionStage> executionStages ) {
- Preconditions.checkNotNull( executionStages, "executionStages is required");
- Preconditions.checkArgument( executionStages.size() > 0, "executionStages must have more than 1 element" );
-
- this.executionStages = executionStages;
- }
-
-
- @Override
- public ExecutionStage first() {
-
- if ( executionStages.size() == 0 ) {
- return null;
- }
-
- return executionStages.get( 0 );
- }
-
-
-
-
-
- @Override
- public ExecutionStage nextStage( final ExecutionStage executionStage ) {
-
- Preconditions.checkNotNull( executionStage, "ExecutionStage cannot be null" );
-
- int index = executionStages.indexOf( executionStage );
-
- //we're done, do nothing
- if ( index == executionStages.size() -1 ) {
- return null;
- }
-
- return executionStages.get( index + 1 );
- }
-
-
- /** Factory to create a new instance. */
- public static StagePipelineImpl fromStages( ExecutionStage... executionStages ) {
- return new StagePipelineImpl(Arrays.asList( executionStages ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
index 7810beb..feed7fa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Delete implements ExecutionStage {
+public class Delete implements EventStage<DeleteCommit> {
private static final Logger LOG = LoggerFactory.getLogger( Delete.class );
@@ -34,20 +35,24 @@ public class Delete implements ExecutionStage {
@Inject
public Delete( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final CollectionEventBus eventBus ) {
Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
- Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+ @Subscribe
+ public void performStage( final DeleteCommit event ) {
+
+ final MvccEntity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -58,11 +63,11 @@ public class Delete implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -84,8 +89,6 @@ public class Delete implements ExecutionStage {
/**
* We're done executing.
*/
- executionContext.proceed();
- //TODO connect to post processors via listener
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
deleted file mode 100644
index 52fe4b9..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface DeletePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
new file mode 100644
index 0000000..5f8840b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteStart extends CollectionEvent<UUID> {
+
+ public DeleteStart( final CollectionContext context, final UUID data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
new file mode 100644
index 0000000..dfdebba
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteCommit extends CollectionEvent<MvccEntity> {
+ public DeleteCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
index 4208662..ca1865b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
@@ -8,17 +8,18 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
@@ -30,36 +31,37 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
* new write in the data store for a checkpoint and recovery
*/
@Singleton
-public class StartDelete implements ExecutionStage {
+public class StartDelete implements EventStage<DeleteStart> {
private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
+ private final CollectionEventBus eventBus;
private final MvccLogEntrySerializationStrategy logStrategy;
private final UUIDService uuidService;
/** Create a new stage with the current context */
@Inject
- public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
-
+ public StartDelete( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logStrategy,
+ final UUIDService uuidService ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.logStrategy = logStrategy;
this.uuidService = uuidService;
+
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final DeleteStart event ) {
- final UUID entityId = executionContext.getMessage( UUID.class );
+ final UUID entityId = event.getData();
final UUID version = uuidService.newTimeUUID();
@@ -68,12 +70,11 @@ public class StartDelete implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.ACTIVE );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
MutationBatch write = logStrategy.write( collectionContext, startEntry );
@@ -90,7 +91,6 @@ public class StartDelete implements ExecutionStage {
//create the mvcc entity for the next stage
final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
- executionContext.setMessage( nextStage );
- executionContext.proceed();
+ eventBus.post( new DeleteCommit( collectionContext, nextStage, event.getResult() ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
new file mode 100644
index 0000000..6f3fab9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventLoad extends CollectionEvent<UUID> {
+ public EventLoad( final CollectionContext context, final UUID data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
index 00a2d43..a2acf3e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
@@ -8,22 +8,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-/**
- * This stage is a load stage to load a single entity
- */
-public class Load implements ExecutionStage {
+/** This stage is a load stage to load a single entity */
+public class Load implements EventStage<EventLoad> {
private static final Logger LOG = LoggerFactory.getLogger( Load.class );
@@ -33,24 +32,26 @@ public class Load implements ExecutionStage {
@Inject
- public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ public Load(final CollectionEventBus eventBus, final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
this.uuidService = uuidService;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final UUID entityId = executionContext.getMessage( UUID.class );
+ @Subscribe
+ public void performStage( final EventLoad event ) {
+ final UUID entityId = event.getData();
Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
//generate a version that represents now
final UUID versionMax = uuidService.newTimeUUID();
@@ -58,25 +59,21 @@ public class Load implements ExecutionStage {
List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
//nothing to do, we didn't get a result back
- if(results.size() != 1){
- executionContext.setMessage( null );
- executionContext.proceed();
+ if ( results.size() != 1 ) {
return;
}
- final Optional<Entity> targetVersion = results.get(0).getEntity();
+ final Optional<Entity> targetVersion = results.get( 0 ).getEntity();
//this entity has been marked as cleared. The version exists, but does not have entity data
- if(!targetVersion.isPresent()){
+ if ( !targetVersion.isPresent() ) {
//TODO, a lazy async repair/cleanup here?
- executionContext.setMessage( null );
- executionContext.proceed();
return;
}
- executionContext.setMessage( targetVersion.get() );
- executionContext.proceed();
+ //this feels like a hack. Not sure I like this
+ event.getResult().addResult( targetVersion.get() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
deleted file mode 100644
index 0d24b27..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineLoad {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
index 4780ff1..65af8cf 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements ExecutionStage {
+public class Commit implements EventStage<EventCommit> {
private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
@@ -33,20 +34,22 @@ public class Commit implements ExecutionStage {
@Inject
- public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ public Commit( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final MvccEntitySerializationStrategy entitySerializationStrategy ) {
Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
- Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+ @Subscribe
+ public void performStage( final EventCommit event ) {
+ final MvccEntity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -57,11 +60,11 @@ public class Commit implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -80,11 +83,10 @@ public class Commit implements ExecutionStage {
throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
}
- /**
- * We're done executing.
- */
- executionContext.proceed();
-
- //TODO connect to post processors via listener
+ //add the mvccEntity to the result
+ event.getResult().addResult( entity );
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
index cbd7d9b..971e9f4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
@@ -9,14 +9,15 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.util.Verify;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -26,35 +27,37 @@ import com.google.inject.Singleton;
* present, and this should set the entityId, version, created, and updated dates
*/
@Singleton
-public class Create implements ExecutionStage {
+public class Create implements EventStage<EventCreate> {
private static final Logger LOG = LoggerFactory.getLogger( Create.class );
+ private final CollectionEventBus eventBus;
private final TimeService timeService;
private final UUIDService uuidService;
@Inject
- public Create( final TimeService timeService, final UUIDService uuidService ) {
+ public Create( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( timeService, "timeService is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.timeService = timeService;
this.uuidService = uuidService;
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final EventCreate event ) {
+ Preconditions.checkNotNull(event, "event is required" );
- final Entity entity = executionContext.getMessage( Entity.class );
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -78,8 +81,6 @@ public class Create implements ExecutionStage {
entity.setCreated( created );
entity.setUpdated( created );
- //set the updated entity for the next stage
- executionContext.setMessage( entity );
- executionContext.proceed();
+ eventBus.post( new EventStart( event.getCollectionContext(), entity, event.getResult() ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
new file mode 100644
index 0000000..0091770
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+public class EventCommit extends CollectionEvent<MvccEntity> {
+
+
+ protected EventCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
new file mode 100644
index 0000000..d5a609f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventCreate extends CollectionEvent<Entity> {
+
+
+ public EventCreate( final CollectionContext context, final Entity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
new file mode 100644
index 0000000..87e2488
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventStart extends CollectionEvent<Entity> {
+
+ public EventStart( final CollectionContext context, final Entity data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
new file mode 100644
index 0000000..68fb6ab
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventUpdate extends CollectionEvent<Entity> {
+
+ public EventUpdate( final CollectionContext context, final Entity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
new file mode 100644
index 0000000..f078aa5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventVerify extends CollectionEvent<MvccEntity> {
+
+
+ public EventVerify( final CollectionContext context, final MvccEntity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
deleted file mode 100644
index f3af972..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineCreate {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
deleted file mode 100644
index 85bc56d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface PipelineUpdate {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
index d6e7d49..ac153f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
@@ -8,15 +8,16 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
@@ -28,32 +29,32 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
* new write in the data store for a checkpoint and recovery
*/
@Singleton
-public class StartWrite implements ExecutionStage {
+public class StartWrite implements EventStage<EventStart> {
private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
+ private final CollectionEventBus eventBus;
private final MvccLogEntrySerializationStrategy logStrategy;
/** Create a new stage with the current context */
@Inject
- public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+ public StartWrite( final CollectionEventBus eventBus,
+ final MvccLogEntrySerializationStrategy logStrategy ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-
+ this.eventBus = eventBus;
this.logStrategy = logStrategy;
+
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
-
- final Entity entity = executionContext.getMessage( Entity.class );
+ @Subscribe
+ public void performStage( final EventStart event ) {
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -64,12 +65,11 @@ public class StartWrite implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.ACTIVE );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
MutationBatch write = logStrategy.write( collectionContext, startEntry );
@@ -86,7 +86,9 @@ public class StartWrite implements ExecutionStage {
//create the mvcc entity for the next stage
final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
- executionContext.setMessage( nextStage );
- executionContext.proceed();
+ eventBus.post( new EventVerify( collectionContext, nextStage, event.getResult() ) );
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
index 0175b42..a095fb3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
@@ -6,13 +6,14 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -22,33 +23,35 @@ import com.google.inject.Singleton;
* been set correctly
*/
@Singleton
-public class Update implements ExecutionStage {
+public class Update implements EventStage<EventUpdate> {
private static final Logger LOG = LoggerFactory.getLogger( Update.class );
private final TimeService timeService;
private final UUIDService uuidService;
+ private final CollectionEventBus eventBus;
@Inject
- public Update( final TimeService timeService, final UUIDService uuidService ) {
+ public Update( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( timeService, "timeService is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.timeService = timeService;
this.uuidService = uuidService;
+
+ this.eventBus.register( this );
+
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final EventUpdate event ) {
- final Entity entity = executionContext.getMessage( Entity.class );
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -64,7 +67,7 @@ public class Update implements ExecutionStage {
entity.setVersion( version );
entity.setUpdated( updated );
- executionContext.setMessage( entity );
- executionContext.proceed();
+ //fire the start event
+ eventBus.post(new EventStart( event.getCollectionContext(), entity, event.getResult()) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
index 884c59b..5a05869 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
@@ -1,25 +1,31 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
/** This phase should execute any verification on the MvccEntity */
@Singleton
-public class Verify implements ExecutionStage {
+public class Verify implements EventStage<EventVerify> {
+ private final CollectionEventBus eventBus;
- public Verify() {
+ @Inject
+ public Verify( final CollectionEventBus eventBus ) {
+ this.eventBus = eventBus;
+ this.eventBus.register( this );
}
+
@Override
- public void performStage( final ExecutionContext executionContext ) {
- //TODO no op for now, just continue to the next stage. Verification logic goes in here
+ public void performStage( final EventVerify event ) {
+ //no op, verification needs to happen here
- executionContext.proceed();
+ eventBus.post( new EventCommit(event.getCollectionContext(), event.getData(), event.getResult()) );
}
}