You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/05 05:17:32 UTC
[1/2] Example using google’s event bus. Seems events are getting dispatched multiple times.
Updated Branches:
refs/heads/two-point-o-eventbus [created] 56c415f2b
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
index 4aefbae..b873fb6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
@@ -8,7 +8,6 @@ import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** The interface that allows us to serialize an entity to disk */
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 1ce64aa..d4663b9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -17,8 +17,8 @@ import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.migration.CollectionColumnFamily;
import org.apache.usergrid.persistence.collection.migration.Migration;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index f27b6ad..9a12b7d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -2,7 +2,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
-import org.apache.usergrid.persistence.collection.guice.PropertyUtils;
import org.apache.usergrid.persistence.collection.migration.Migration;
import org.apache.usergrid.persistence.collection.migration.MigrationManager;
import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
@@ -11,7 +10,6 @@ import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySeri
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
-import com.google.inject.name.Names;
import com.netflix.astyanax.Keyspace;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/service/impl/ServiceModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/service/impl/ServiceModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/service/impl/ServiceModule.java
index 858aed7..9f6835d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/service/impl/ServiceModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/service/impl/ServiceModule.java
@@ -1,20 +1,10 @@
package org.apache.usergrid.persistence.collection.service.impl;
-import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
-import org.apache.usergrid.persistence.collection.migration.Migration;
-import org.apache.usergrid.persistence.collection.migration.MigrationManager;
-import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import com.google.inject.AbstractModule;
-import com.google.inject.multibindings.Multibinder;
-import com.netflix.astyanax.Keyspace;
/** @author tnine */
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
index 9672a6b..a9dd579 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerIT.java
@@ -6,11 +6,13 @@ import org.junit.Test;
import org.apache.usergrid.persistence.collection.guice.CassandraTestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventCreate;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.field.IntegerField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.test.CassandraRule;
+import com.google.common.eventbus.EventBus;
import com.google.guiceberry.junit4.GuiceBerryRule;
import com.google.inject.Inject;
@@ -18,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
/** @author tnine */
@@ -33,6 +36,9 @@ public class CollectionManagerIT {
@Inject
private CollectionManagerFactory factory;
+ @Inject
+ private EventBus eventBus;
+
@Test
public void create() {
@@ -135,4 +141,6 @@ public class CollectionManagerIT {
assertEquals("Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
index fed0663..e3bb133 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/CollectionManagerTest.java
@@ -7,9 +7,8 @@ import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.impl.CollectionContextImpl;
import org.apache.usergrid.persistence.collection.impl.CollectionManagerImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -24,44 +23,44 @@ import static org.mockito.Mockito.when;
/** @author tnine */
public class CollectionManagerTest {
-
- @Test
- public void create(){
-
- ExecutionStage mockExecutionStage = mock(ExecutionStage.class);
-
- StagePipeline createPipeline = mock(StagePipeline.class);
- StagePipeline updatePipeline = mock(StagePipeline.class);
- StagePipeline deletePipeline = mock(StagePipeline.class);
- StagePipeline loadPipeline = mock(StagePipeline.class);
-
-
-
- //mock up returning the first stage
- when(createPipeline.first()).thenReturn( mockExecutionStage );
-
-
- CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
-
- CollectionManager collectionManager = new CollectionManagerImpl(createPipeline, updatePipeline, deletePipeline, loadPipeline, context);
-
- Entity create = new Entity();
-
- MvccEntity mvccEntity = mock(MvccEntity.class);
-
-
- Entity returned = collectionManager.create( create );
-
- //verify the first stage was asked for
- verify(createPipeline).first();
-
- ArgumentCaptor<ExecutionContext> contextArg = ArgumentCaptor.forClass(ExecutionContext.class);
-
- //verify the first perform stage was invoked
- verify( mockExecutionStage ).performStage( contextArg.capture() );
-
- //verify we set the passed entity into the ExecutionContext
- assertEquals("Entity should be present in the write context", create, contextArg.getValue().getMessage( Entity.class ));
-
- }
+//
+// @Test
+// public void create(){
+//
+// EventStage mockEventStage = mock(EventStage.class);
+//
+// StagePipeline createPipeline = mock(StagePipeline.class);
+// StagePipeline updatePipeline = mock(StagePipeline.class);
+// StagePipeline deletePipeline = mock(StagePipeline.class);
+// StagePipeline loadPipeline = mock(StagePipeline.class);
+//
+//
+//
+// //mock up returning the first stage
+// when(createPipeline.first()).thenReturn( mockEventStage );
+//
+//
+// CollectionContext context = new CollectionContextImpl( UUIDGenerator.newTimeUUID(), UUIDGenerator.newTimeUUID(), "test" );
+//
+// CollectionManager collectionManager = new CollectionManagerImpl(createPipeline, updatePipeline, deletePipeline, loadPipeline, context);
+//
+// Entity create = new Entity();
+//
+// MvccEntity mvccEntity = mock(MvccEntity.class);
+//
+//
+// Entity returned = collectionManager.create( create );
+//
+// //verify the first stage was asked for
+// verify(createPipeline).first();
+//
+// ArgumentCaptor<ExecutionContext> contextArg = ArgumentCaptor.forClass(ExecutionContext.class);
+//
+// //verify the first perform stage was invoked
+// verify( mockEventStage ).performStage( contextArg.capture() );
+//
+// //verify we set the passed entity into the ExecutionContext
+// assertEquals("Entity should be present in the write context", create, contextArg.getValue().getMessage( Entity.class ));
+//
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
deleted file mode 100644
index 1730107..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContextTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-
-import static junit.framework.TestCase.assertSame;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/** @author tnine */
-public class ExecutionContextTest {
-
- @Test
- public void performWrite() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- ExecutionStage executionStage = mock( ExecutionStage.class );
-
- when( pipeline.first() ).thenReturn( executionStage );
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- executionContext.execute( test );
-
- //verify we called first in the pipeline to get the first value
- verify( pipeline ).first();
-
- //verify the first executionStage was invoked
- verify( executionStage ).performStage( same( executionContext ) );
-
- //verify the bean value was set
- assertSame( test, executionContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void setAndGet() {
- Object test = new Object();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- executionContext.setMessage( test );
-
- assertSame( "Same value returned", test, executionContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void setAndGetTypeSafe() {
- TestBean test = new TestBean();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- executionContext.setMessage( test );
-
- //works because Test is an instance of object
- assertSame( "Test instance of object", test, executionContext.getMessage( Object.class ) );
-
- assertSame( "Test instance of object", test, executionContext.getMessage( TestBean.class ) );
- }
-
-
- @Test( expected = ClassCastException.class )
- public void setAndGetBadType() {
- Object test = new Object();
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- executionContext.setMessage( test );
-
- //works because Test is an instance of object
- assertSame( "Test instance of object", test, executionContext.getMessage( Object.class ) );
-
- //should blow up, not type save. The object test is not an instance of TestBean
- executionContext.getMessage( TestBean.class );
- }
-
-
- @Test
- public void nullMessage() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- executionContext.setMessage( null );
-
- //works because Test is an instance of object
- assertNull( "Null message returned", executionContext.getMessage( Object.class ) );
- }
-
-
- @Test
- public void proceedHasNextStep() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- ExecutionStage firstExecutionStage = mock( ExecutionStage.class );
-
- ExecutionStage secondExecutionStage = mock( ExecutionStage.class );
-
-
- when( pipeline.first() ).thenReturn( firstExecutionStage );
-
- when( pipeline.nextStage( same( firstExecutionStage ) ) ).thenReturn( secondExecutionStage );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- executionContext.execute( test );
-
- //now proceed and validate we were called
- executionContext.proceed();
-
- verify( secondExecutionStage ).performStage( same( executionContext ) );
- }
-
-
- @Test
- public void proceedNoNextStep() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
- ExecutionStage firstExecutionStage = mock( ExecutionStage.class );
-
- when( pipeline.first() ).thenReturn( firstExecutionStage );
-
- when( pipeline.nextStage( same( firstExecutionStage ) ) ).thenReturn( null );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- Object test = new Object();
-
- executionContext.execute( test );
-
- //now proceed and validate we were called
- executionContext.proceed();
- }
-
-
- @Test
- public void getContextCorrect() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
- StagePipeline pipeline = mock( StagePipeline.class );
-
-
- ExecutionContext executionContext = new ExecutionContextImpl( pipeline, collectionContext );
-
- assertSame( "Collection context pointer correct", collectionContext, executionContext.getCollectionContext() );
- }
-
-
-
-
- @Test( expected = NullPointerException.class )
- public void nullContextFails() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
-
- new ExecutionContextImpl( null, collectionContext );
- }
-
-
- @Test( expected = NullPointerException.class )
- public void nullPipelineFails() {
-
- CollectionContext collectionContext = mock( CollectionContext.class );
-
-
- new ExecutionContextImpl( null, collectionContext );
- }
-
-
- private static class TestBean {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
deleted file mode 100644
index 86a2a3a..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipelineTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.StagePipelineImpl;
-
-import static junit.framework.TestCase.assertSame;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-
-/** @author tnine */
-public class StagePipelineTest {
-
- @Test
- public void oneStage() {
- ExecutionStage first = mock( ExecutionStage.class );
-
- StagePipeline pipeline = StagePipelineImpl.fromStages( first );
-
- assertSame( "Correct stage returned", first, pipeline.first() );
-
- ExecutionStage next = pipeline.nextStage( first );
-
- assertNull( "No next stage", next );
- }
-
-
- @Test
- public void threeStages() {
- ExecutionStage first = mock( ExecutionStage.class );
- ExecutionStage second = mock( ExecutionStage.class );
- ExecutionStage third = mock( ExecutionStage.class );
-
- StagePipeline pipeline = StagePipelineImpl.fromStages( first, second, third );
-
- assertSame( "Correct stage returned", first, pipeline.first() );
-
- ExecutionStage next = pipeline.nextStage( first );
-
- assertSame( "Correct stage returned", second, next );
-
- next = pipeline.nextStage( next );
-
- assertSame( "Correct stage returned", third, next );
-
- next = pipeline.nextStage( next );
-
- assertNull( "No next stage", next );
- }
-
-
- /**
- * Test seeking without calling .first() just to make sure there's no side effects
- */
- @Test
- public void stageSeek() {
- ExecutionStage first = mock( ExecutionStage.class );
- ExecutionStage second = mock( ExecutionStage.class );
- ExecutionStage third = mock( ExecutionStage.class );
-
- StagePipeline pipeline = StagePipelineImpl.fromStages( first, second, third );
-
-
- ExecutionStage next = pipeline.nextStage( second );
-
- assertSame( "Correct stage returned", third, next );
-
- next = pipeline.nextStage( next );
-
- assertNull( "No next stage", next );
- }
-
-
- @Test( expected = NullPointerException.class )
- public void invalidStageInput() {
- ExecutionStage first = mock( ExecutionStage.class );
-
- StagePipeline pipeline = StagePipelineImpl.fromStages( first );
- pipeline.nextStage( null );
- }
-
-
- @Test( expected = IllegalArgumentException.class )
- public void noStagesErrors() {
- StagePipelineImpl.fromStages();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/CreateTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/CreateTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/CreateTest.java
index 41152c1..54fadc9 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/CreateTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/CreateTest.java
@@ -7,7 +7,10 @@ import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
@@ -29,14 +32,6 @@ public class CreateTest {
@Test
public void testValidInput() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
-
- //set up the mock to return the entity from the start phase
- final Entity entity = new Entity();
-
- when( executionContext.getMessage( Entity.class ) ).thenReturn( entity );
-
//mock returning the time
final TimeService timeService = mock( TimeService.class );
@@ -56,20 +51,34 @@ public class CreateTest {
//mock the uuid service
when( uuidService.newTimeUUID() ).thenReturn( newEntityId );
+ final CollectionEventBus eventBus = mock(CollectionEventBus.class);
+
+ Result result = new Result();
+
//perform the stage
- final Create create = new Create( timeService, uuidService );
+ final Create create = new Create(eventBus, timeService, uuidService );
+
+
+
+ //set up the mock to return the entity from the start phase
+ final Entity entity = new Entity();
+
+
+ final CollectionContext context = mock(CollectionContext.class);
+
+ EventCreate createEvent = new EventCreate(context, entity, result );
+ create.performStage( createEvent );
- create.performStage( executionContext );
//now verify our output was correct
- ArgumentCaptor<Entity> mvccEntity = ArgumentCaptor.forClass( Entity.class );
+ ArgumentCaptor<EventStart> event = ArgumentCaptor.forClass( EventStart.class );
- verify( executionContext ).setMessage( mvccEntity.capture() );
+ verify( eventBus ).post( event.capture() );
- Entity created = mvccEntity.getValue();
+ Entity created = event.getValue().getData();
//verify uuid and version in both the MvccEntity and the entity itself
assertEquals( "Entity re-set into context", entity, created );
@@ -81,8 +90,6 @@ public class CreateTest {
assertEquals( "updated time matches generator", time, created.getUpdated() );
- //now verify the proceed was called
- verify( executionContext ).proceed();
}
@@ -103,12 +110,15 @@ public class CreateTest {
//mock the uuid service
final UUIDService uuidService = mock( UUIDService.class );
+ final CollectionEventBus eventBus = mock(CollectionEventBus.class);
+
+
//perform the stage
- final Create create = new Create( timeService, uuidService );
+ final Create create = new Create( eventBus, timeService, uuidService );
//should throw an NPE
- create.performStage( executionContext );
+ create.performStage( null );
}
@@ -116,31 +126,46 @@ public class CreateTest {
/** Test no time service */
@Test(expected = NullPointerException.class)
- public void testNoTimeService() throws ConnectionException, ExecutionException, InterruptedException {
+ public void testNoEventBus() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
- when( executionContext.getMessage( Entity.class ) ).thenReturn( null );
-
//mock the uuid service
final UUIDService uuidService = mock( UUIDService.class );
+ final TimeService timeService = mock(TimeService.class);
+
+
//perform the stage
- new Create( null, uuidService );
+ new Create( null, timeService, uuidService );
}
+
/** Test no time service */
@Test(expected = NullPointerException.class)
- public void testNoUUIDService() throws ConnectionException, ExecutionException, InterruptedException {
+ public void testNoTimeService() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
+ final CollectionEventBus eventBus = mock(CollectionEventBus.class);
- when( executionContext.getMessage( Entity.class ) ).thenReturn( null );
+ //mock the uuid service
+ final UUIDService uuidService = mock( UUIDService.class );
+
+
+
+ //perform the stage
+ new Create( eventBus, null, uuidService );
+ }
+
+
+ /** Test no time service */
+ @Test(expected = NullPointerException.class)
+ public void testNoUUIDService() throws ConnectionException, ExecutionException, InterruptedException {
+
+ final CollectionEventBus eventBus = mock(CollectionEventBus.class);
//mock returning the time
@@ -148,6 +173,6 @@ public class CreateTest {
//throw NPE
- new Create( timeService, null );
+ new Create(eventBus, timeService, null );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWriteTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWriteTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWriteTest.java
index 006dda9..5dc8de7 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWriteTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWriteTest.java
@@ -9,11 +9,11 @@ import org.mockito.ArgumentCaptor;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -36,19 +36,9 @@ public class StartWriteTest {
public void testStartStage() throws Exception {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
final CollectionContext context = mock( CollectionContext.class );
-
- //mock returning the context
- when( executionContext.getCollectionContext() ).thenReturn( context );
-
-
- //set up the mock to return the entity from the start phase
- final Entity entity = generateEntity();
-
- //mock returning the entity from the write context
- when( executionContext.getMessage( Entity.class ) ).thenReturn( entity );
+ final CollectionEventBus bus = mock( CollectionEventBus.class );
//mock returning a mock mutation when we do a log entry write
@@ -61,14 +51,22 @@ public class StartWriteTest {
when( logStrategy.write( same( context ), logEntry.capture() ) ).thenReturn( mutation );
+ Result result = new Result();
+
+ //set up the mock to return the entity from the start phase
+ final Entity entity = generateEntity();
+
+
+ EventStart start = new EventStart( context, entity, result );
+
//run the stage
- StartWrite newStage = new StartWrite( logStrategy );
+ StartWrite newStage = new StartWrite( bus, logStrategy );
- newStage.performStage( executionContext );
+ newStage.performStage( start );
//now verify our output was correct
- ArgumentCaptor<MvccEntity> mvccEntity = ArgumentCaptor.forClass( MvccEntity.class );
+ ArgumentCaptor<EventVerify> eventVerify = ArgumentCaptor.forClass( EventVerify.class );
//verify the log entry is correct
@@ -76,44 +74,18 @@ public class StartWriteTest {
assertEquals( "entity id did not match ", entity.getUuid(), entry.getEntityId() );
assertEquals( "version did not not match entityId", entity.getVersion(), entry.getVersion() );
- assertEquals( "ExecutionStage is correct", Stage.ACTIVE, entry.getStage() );
+ assertEquals( "EventStage is correct", Stage.ACTIVE, entry.getStage() );
//now verify we set the message into the write context
- verify( executionContext ).setMessage( mvccEntity.capture() );
+ verify( bus ).post( eventVerify.capture() );
- MvccEntity created = mvccEntity.getValue();
+ MvccEntity created = eventVerify.getValue().getData();
//verify uuid and version in both the MvccEntity and the entity itself
assertEquals( "entity id did not match generator", entity.getUuid(), created.getUuid() );
assertEquals( "version did not not match entityId", entity.getVersion(), created.getVersion() );
assertSame( "Entity correct", entity, created.getEntity().get() );
-
-
- //now verify the proceed was called
- verify( executionContext ).proceed();
- }
-
-
- /** Test no entity in the pipeline */
- @Test( expected = NullPointerException.class )
- public void testNoEntity() throws Exception {
-
-
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
-
- //mock returning the entity from the write context
- when( executionContext.getMessage( Entity.class ) ).thenReturn( null );
-
-
- //mock returning a mock mutation when we do a log entry write
- final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
-
- //run the stage
- StartWrite newStage = new StartWrite( logStrategy );
-
- newStage.performStage( executionContext );
}
@@ -122,25 +94,22 @@ public class StartWriteTest {
public void testNoEntityId() throws Exception {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
-
final Entity entity = new Entity();
final UUID version = UUIDGenerator.newTimeUUID();
entity.setVersion( version );
- //mock returning the entity from the write context
- when( executionContext.getMessage( Entity.class ) ).thenReturn( entity );
+ final CollectionContext context = mock( CollectionContext.class );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
//run the stage
- StartWrite newStage = new StartWrite( logStrategy );
+ StartWrite newStage = new StartWrite( eventBus, logStrategy );
- newStage.performStage( executionContext );
+ newStage.performStage( new EventStart( context, entity, new Result() ) );
}
@@ -149,9 +118,6 @@ public class StartWriteTest {
public void testNoEntityVersion() throws Exception {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
-
final Entity entity = new Entity();
final UUID entityId = UUIDGenerator.newTimeUUID();
@@ -159,17 +125,17 @@ public class StartWriteTest {
FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
- //mock returning the entity from the write context
- when( executionContext.getMessage( Entity.class ) ).thenReturn( entity );
+ final CollectionContext context = mock( CollectionContext.class );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
//run the stage
- StartWrite newStage = new StartWrite( logStrategy );
+ StartWrite newStage = new StartWrite( eventBus, logStrategy );
- newStage.performStage( executionContext );
+ newStage.performStage( new EventStart( context, entity, new Result() ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/UpdateTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/UpdateTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/UpdateTest.java
index 24ec265..910eb38 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/UpdateTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/UpdateTest.java
@@ -9,7 +9,10 @@ import org.mockito.ArgumentCaptor;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -31,8 +34,6 @@ public class UpdateTest {
@Test
public void testValidInput() throws Exception {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
//set up the mock to return the entity from the start phase
final Entity entity = new Entity();
@@ -42,9 +43,8 @@ public class UpdateTest {
FieldUtils.writeDeclaredField( entity, "uuid", existingEntityId, true );
entity.setCreated( createdTime );
-
- when( executionContext.getMessage( Entity.class ) ).thenReturn( entity );
-
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
+ final CollectionContext context = mock( CollectionContext.class );
//mock returning the time
final TimeService timeService = mock( TimeService.class );
@@ -66,19 +66,18 @@ public class UpdateTest {
//perform the stage
- final Update create = new Update( timeService, uuidService );
-
+ final Update create = new Update( eventBus, timeService, uuidService );
- create.performStage( executionContext );
+ create.performStage( new EventUpdate( context, entity, new Result() ) );
//now verify our output was correct
- ArgumentCaptor<Entity> mvccEntity = ArgumentCaptor.forClass( Entity.class );
+ ArgumentCaptor<EventStart> mvccEntity = ArgumentCaptor.forClass( EventStart.class );
- verify( executionContext ).setMessage( mvccEntity.capture() );
+ verify( eventBus ).post( mvccEntity.capture() );
- Entity created = mvccEntity.getValue();
+ Entity created = mvccEntity.getValue().getData();
//verify uuid and version in both the MvccEntity and the entity itself
assertEquals( "Entity re-set into context", entity, created );
@@ -88,15 +87,11 @@ public class UpdateTest {
//check the time
assertEquals( "created time matches generator", createdTime, created.getCreated() );
assertEquals( "updated time matches generator", updateTime, created.getUpdated() );
-
-
- //now verify the proceed was called
- verify( executionContext ).proceed();
}
/** Test the start stage for happy path */
- @Test(expected = NullPointerException.class)
+ @Test( expected = NullPointerException.class )
public void testInvalidInput() throws ConnectionException, ExecutionException, InterruptedException {
final ExecutionContext executionContext = mock( ExecutionContext.class );
@@ -112,51 +107,48 @@ public class UpdateTest {
//mock the uuid service
final UUIDService uuidService = mock( UUIDService.class );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
+
//perform the stage
- final Update create = new Update( timeService, uuidService );
+ final Update create = new Update( eventBus, timeService, uuidService );
//should throw an NPE
- create.performStage( executionContext );
-
-
+ create.performStage( null );
}
- @Test(expected = NullPointerException.class)
- public void testInvalidInputNoId() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
+ @Test( expected = NullPointerException.class )
+ public void testInvalidInputNoId() throws ConnectionException, ExecutionException, InterruptedException {
- when( executionContext.getMessage( Entity.class ) ).thenReturn( new Entity( ) );
+ //mock returning the time
+ final TimeService timeService = mock( TimeService.class );
- //mock returning the time
- final TimeService timeService = mock( TimeService.class );
+ //mock the uuid service
+ final UUIDService uuidService = mock( UUIDService.class );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
- //mock the uuid service
- final UUIDService uuidService = mock( UUIDService.class );
+ final CollectionContext context = mock( CollectionContext.class );
- //perform the stage
- final Update create = new Update( timeService, uuidService );
-
- //should throw an NPE
- create.performStage( executionContext );
+ //perform the stage
+ final Update create = new Update( eventBus, timeService, uuidService );
+ final Entity entity = new Entity();
- }
+ //should throw an NPE due to no entity id
+ create.performStage( new EventUpdate(context, entity, new Result() ) );
+ }
- /** Test no time service */
- @Test(expected = NullPointerException.class)
+ /** Test no uuid service */
+ @Test( expected = NullPointerException.class )
public void testNoTimeService() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
-
-
- when( executionContext.getMessage( Entity.class ) ).thenReturn( null );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
//mock the uuid service
@@ -164,26 +156,38 @@ public class UpdateTest {
//perform the stage
- new Update( null, uuidService );
+ new Update(eventBus, null, uuidService );
}
/** Test no time service */
- @Test(expected = NullPointerException.class)
+ @Test( expected = NullPointerException.class )
public void testNoUUIDService() throws ConnectionException, ExecutionException, InterruptedException {
- final ExecutionContext executionContext = mock( ExecutionContext.class );
+ final CollectionEventBus eventBus = mock( CollectionEventBus.class );
+ //mock returning the time
+ final TimeService timeService = mock( TimeService.class );
- when( executionContext.getMessage( Entity.class ) ).thenReturn( null );
+ //throw NPE
+ new Update(eventBus, timeService, null );
+ }
- //mock returning the time
+
+ /** Test no bus service */
+ @Test( expected = NullPointerException.class )
+ public void testNoBusService() throws ConnectionException, ExecutionException, InterruptedException {
+
+ //mock returning the time
final TimeService timeService = mock( TimeService.class );
+ //mock the uuid service
+ final UUIDService uuidService = mock( UUIDService.class );
+
+
//throw NPE
- new Update( timeService, null );
+ new Update(null, timeService, uuidService );
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
deleted file mode 100644
index 5cc021c..0000000
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.usergrid.persistence.index.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-
-
-/**
- *
- * @author: tnine
- *
- */
-public class Complete implements ExecutionStage
-{
-
- @Override
- public void performStage( final ExecutionContext context ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
deleted file mode 100644
index ddaa0f3..0000000
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.usergrid.persistence.index.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-
-
-/** This state should signal an index update has started */
-public class Start implements ExecutionStage
-{
-
- @Override
- public void performStage( final ExecutionContext context ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
deleted file mode 100644
index 252f5af..0000000
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.index.stage;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-
-
-/** This state should perform an update of the index. */
-public class Write implements ExecutionStage
-{
-
-
-@Override
- public void performStage( final ExecutionContext context ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
-
[2/2] git commit: Example using google’s event bus. Seems events are getting dispatched multiple times.
Posted by to...@apache.org.
Example using google’s event bus. Seems events are getting dispatched multiple times.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56c415f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56c415f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56c415f2
Branch: refs/heads/two-point-o-eventbus
Commit: 56c415f2bd6ac784b5e5d71a6f92bbbc11f82b86
Parents: 44072d5
Author: Todd Nine <to...@apache.org>
Authored: Wed Dec 4 21:14:42 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Wed Dec 4 21:14:42 2013 -0700
----------------------------------------------------------------------
.../collection/impl/CollectionManagerImpl.java | 61 +++---
.../mvcc/entity/CollectionEventBus.java | 38 ++++
.../collection/mvcc/entity/MvccEntity.java | 1 -
.../collection/mvcc/entity/MvccLogEntry.java | 2 -
.../entity/impl/CollectionEventBusImpl.java | 20 ++
.../mvcc/entity/impl/MvccEntityImpl.java | 1 -
.../mvcc/entity/impl/MvccLogEntryImpl.java | 1 -
.../mvcc/event/PostProcessListener.java | 3 -
.../collection/mvcc/stage/CollectionEvent.java | 43 ++++
.../collection/mvcc/stage/EventStage.java | 18 ++
.../collection/mvcc/stage/ExecutionContext.java | 5 -
.../collection/mvcc/stage/ExecutionStage.java | 15 --
.../collection/mvcc/stage/Result.java | 48 ++++
.../collection/mvcc/stage/StagePipeline.java | 8 +-
.../stage/impl/CollectionPipelineModule.java | 72 ++----
.../mvcc/stage/impl/ExecutionContextImpl.java | 92 --------
.../mvcc/stage/impl/StagePipelineImpl.java | 61 ------
.../mvcc/stage/impl/delete/Delete.java | 27 ++-
.../mvcc/stage/impl/delete/DeletePipeline.java | 23 --
.../mvcc/stage/impl/delete/DeleteStart.java | 17 ++
.../mvcc/stage/impl/delete/Deletecommit.java | 16 ++
.../mvcc/stage/impl/delete/StartDelete.java | 36 +--
.../mvcc/stage/impl/read/EventLoad.java | 17 ++
.../collection/mvcc/stage/impl/read/Load.java | 35 ++-
.../mvcc/stage/impl/read/PipelineLoad.java | 23 --
.../mvcc/stage/impl/write/Commit.java | 34 +--
.../mvcc/stage/impl/write/Create.java | 29 +--
.../mvcc/stage/impl/write/EventCommit.java | 16 ++
.../mvcc/stage/impl/write/EventCreate.java | 17 ++
.../mvcc/stage/impl/write/EventStart.java | 17 ++
.../mvcc/stage/impl/write/EventUpdate.java | 16 ++
.../mvcc/stage/impl/write/EventVerify.java | 17 ++
.../mvcc/stage/impl/write/PipelineCreate.java | 23 --
.../mvcc/stage/impl/write/PipelineUpdate.java | 23 --
.../mvcc/stage/impl/write/StartWrite.java | 40 ++--
.../mvcc/stage/impl/write/Update.java | 29 +--
.../mvcc/stage/impl/write/Verify.java | 20 +-
.../MvccEntitySerializationStrategy.java | 1 -
.../MvccLogEntrySerializationStrategyImpl.java | 2 +-
.../serialization/impl/SerializationModule.java | 2 -
.../collection/service/impl/ServiceModule.java | 10 -
.../collection/CollectionManagerIT.java | 8 +
.../collection/CollectionManagerTest.java | 83 ++++---
.../mvcc/stage/ExecutionContextTest.java | 217 -------------------
.../mvcc/stage/StagePipelineTest.java | 89 --------
.../mvcc/stage/impl/write/CreateTest.java | 77 ++++---
.../mvcc/stage/impl/write/StartWriteTest.java | 84 +++----
.../mvcc/stage/impl/write/UpdateTest.java | 98 +++++----
.../persistence/index/stage/Complete.java | 21 --
.../usergrid/persistence/index/stage/Start.java | 16 --
.../usergrid/persistence/index/stage/Write.java | 18 --
51 files changed, 659 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index e20ba62..7de8f50 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -8,16 +8,17 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.CollectionManager;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeleteStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.EventLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventUpdate;
import org.apache.usergrid.persistence.model.entity.Entity;
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -32,23 +33,16 @@ public class CollectionManagerImpl implements CollectionManager {
private static final Logger logger = LoggerFactory.getLogger( CollectionManagerImpl.class );
private final CollectionContext context;
- private final StagePipeline createPipeline;
- private final StagePipeline updatePipeline;
- private final StagePipeline deletePipeline;
- private final StagePipeline loadPipeline;
+ private final CollectionEventBus eventBus;
@Inject
- public CollectionManagerImpl( @PipelineCreate final StagePipeline createPipeline,
- @PipelineUpdate final StagePipeline updatePipeline,
- @DeletePipeline final StagePipeline deletePipeline,
- @PipelineLoad final StagePipeline loadPipeline,
+ public CollectionManagerImpl( final CollectionEventBus eventBus,
@Assisted final CollectionContext context ) {
- this.createPipeline = createPipeline;
- this.updatePipeline = updatePipeline;
- this.deletePipeline = deletePipeline;
- this.loadPipeline = loadPipeline;
+ Preconditions.checkNotNull( eventBus, "eventBus must be defined" );
+ Preconditions.checkNotNull( context, "context must be defined" );
+ this.eventBus = eventBus;
this.context = context;
}
@@ -56,46 +50,43 @@ public class CollectionManagerImpl implements CollectionManager {
@Override
public Entity create( final Entity entity ) {
// Create a new context for the write
- ExecutionContext executionContext = new ExecutionContextImpl( createPipeline, context );
+ Result result = new Result();
- //perform the write
- executionContext.execute( entity );
+ eventBus.post( new EventCreate( context, entity, result ) );
- MvccEntity result = executionContext.getMessage( MvccEntity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
- return result.getEntity().get();
+ return completed.getEntity().get();
}
@Override
public Entity update( final Entity entity ) {
// Create a new context for the write
- ExecutionContext executionContext = new ExecutionContextImpl( updatePipeline, context );
+ Result result = new Result();
- //perform the write
- executionContext.execute( entity );
+ eventBus.post( new EventUpdate( context, entity, result ) );
- MvccEntity result = executionContext.getMessage( MvccEntity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
- return result.getEntity().get();
+ return completed.getEntity().get();
}
@Override
public void delete( final UUID entityId ) {
- ExecutionContext deleteContext = new ExecutionContextImpl( deletePipeline, context );
-
- deleteContext.execute( entityId );
+ eventBus.post( new DeleteStart( context, entityId, null ) );
}
@Override
public Entity load( final UUID entityId ) {
- ExecutionContext loadContext = new ExecutionContextImpl( loadPipeline, context );
+ Result result = new Result();
- loadContext.execute( entityId );
+ eventBus.post( new EventLoad( context, entityId, result ) );
- return loadContext.getMessage( Entity.class );
+ MvccEntity completed = result.getLast( MvccEntity.class );
+ return completed.getEntity().get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
new file mode 100644
index 0000000..c53f5ce
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
@@ -0,0 +1,38 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+/** A dup of the Guava EventBus so we can easily mock and test
+ * @author tnine */
+public interface CollectionEventBus {
+
+ /**
+ * Registers all handler methods on {@code object} to receive events.
+ * Handler methods are selected and classified using this EventBus's
+ * {@link com.google.common.eventbus.HandlerFindingStrategy}; the default strategy is the
+ * {@link com.google.common.eventbus.AnnotatedHandlerFinder}.
+ *
+ * @param object object whose handler methods should be registered.
+ */
+ void register(Object object);
+
+ /**
+ * Unregisters all handler methods on a registered {@code object}.
+ *
+ * @param object object whose handler methods should be unregistered.
+ * @throws IllegalArgumentException if the object was not previously registered.
+ */
+ void unregister(Object object);
+
+ /**
+ * Posts an event to all registered handlers. This method will return
+ * successfully after the event has been posted to all handlers, and
+ * regardless of any exceptions thrown by handlers.
+ *
+ * <p>If no handlers have been subscribed for {@code event}'s class, and
+ * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a
+ * DeadEvent and reposted.
+ *
+ * @param event event to post.
+ */
+ void post(Object event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
index cc212c8..59197e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
index c6886d1..d14b35c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
@@ -3,8 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
-
/**
* A Marker interface for an in flight update to allow context information to be passed between states
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
new file mode 100644
index 0000000..3278218
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+
+import com.google.common.eventbus.EventBus;
+
+
+/** @author tnine */
+public class CollectionEventBusImpl extends EventBus implements CollectionEventBus{
+
+ public CollectionEventBusImpl() {
+ super();
+ }
+
+
+ public CollectionEventBusImpl( final String identifier ) {
+ super( identifier );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
index 1179dad..2d9f2f9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.model.entity.Entity;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
index 2f0b4d7..dee56ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
index 29e6515..68479e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
@@ -1,9 +1,6 @@
package org.apache.usergrid.persistence.collection.mvcc.event;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
/**
*
* @author: tnine
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
new file mode 100644
index 0000000..84df2d0
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
@@ -0,0 +1,43 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+
+import com.google.common.base.Preconditions;
+
+
+/** @author tnine */
+public abstract class CollectionEvent<T> {
+
+ private final CollectionContext context;
+ private final T data;
+ private final Result result;
+
+
+ protected CollectionEvent( final CollectionContext context, final T data, final Result result ) {
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( data, "context is required" );
+ Preconditions.checkNotNull( context, "context is required" );
+ this.context = context;
+ this.data = data;
+ this.result = result;
+ }
+
+
+
+
+ /** Get the collection context for this event */
+ public CollectionContext getCollectionContext() {
+ return this.context;
+ }
+
+
+ public T getData() {
+ return data;
+ }
+
+
+ public Result getResult() {
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
new file mode 100644
index 0000000..c62eb36
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import com.google.common.eventbus.Subscribe;
+
+
+/** The possible stages in our write flow. */
+public interface EventStage<T extends CollectionEvent> {
+
+ /**
+ * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
+ *
+ * @param event The event to receive
+ *
+ */
+ @Subscribe
+ public void performStage(T event );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
index f124dcf..0fb9ced 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
@@ -1,12 +1,7 @@
package org.apache.usergrid.persistence.collection.mvcc.stage;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
/** @author tnine */
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
deleted file mode 100644
index a98c813..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-/** The possible stages in our write flow. */
-public interface ExecutionStage {
-
- /**
- * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
- *
- * @param context The context of the current write operation
- *
- * @return The asynchronous listener to signal success
- */
- public void performStage( ExecutionContext context );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
new file mode 100644
index 0000000..1887c71
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
@@ -0,0 +1,48 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/** @author tnine */
+public class Result {
+
+ private List<Object> results;
+
+
+ public Result() {
+ results = new ArrayList<Object>();
+ }
+
+
+ public List<Object> getResults() {
+ return results;
+ }
+
+
+ public void addResult( final Object result ) {
+ this.results.add( result );
+ }
+
+
+ /**
+ * Get the last occurrence of an instance that implements the type provided.
+ * @param clazz The class that the value should be an instance of
+ * @param <T> The type of class
+ * @return The value if one is found, null otherwise
+ */
+ public <T> T getLast( Class<T> clazz ) {
+
+ final int size = results.size();
+
+ for ( int i = size - 1; i > -1; i-- ) {
+ final Object value = results.get( i );
+ if ( clazz.isInstance( value ) ) {
+ return ( T ) value;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 9d68e10..33a2264 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -13,14 +13,14 @@ public interface StagePipeline {
/**
* Get the first stage in this pipeline.
*/
- ExecutionStage first();
+ EventStage first();
/**
- * get the next executionStage after the executionStage specified
- * @param executionStage The executionStage to seek in our pipeline
+ * get the next eventStage after the eventStage specified
+ * @param eventStage The eventStage to seek in our pipeline
*/
- ExecutionStage nextStage(ExecutionStage executionStage );
+ EventStage nextStage(EventStage eventStage );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 68dab72..7ef260b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,23 +1,19 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.CollectionEventBusImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.Delete;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.StartDelete;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.Load;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Commit;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Update;
import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Verify;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
@@ -31,45 +27,12 @@ import com.google.inject.multibindings.Multibinder;
public class CollectionPipelineModule extends AbstractModule {
- /**
- * Wire the pipeline of operations for create. This should create a new instance every time, since StagePipeline
- * objects are mutable
- */
@Provides
- @PipelineCreate
- @Inject
@Singleton
- public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
- final Commit commit ) {
- return StagePipelineImpl.fromStages( create, startWrite, write, commit );
- }
-
+ public CollectionEventBus eventBus(){
+ CollectionEventBus bus = new CollectionEventBusImpl( "collection" );
- @Provides
- @PipelineUpdate
- @Inject
- @Singleton
- public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
- final Commit commit ) {
- return StagePipelineImpl.fromStages( update, startWrite, write, commit );
- }
-
-
- @Provides
- @DeletePipeline
- @Inject
- @Singleton
- public StagePipeline deletePipeline(final StartDelete startDelete, final Delete delete ) {
- return StagePipelineImpl.fromStages(startDelete, delete );
- }
-
-
- @Provides
- @PipelineLoad
- @Inject
- @Singleton
- public StagePipeline deletePipeline( final Load load ) {
- return StagePipelineImpl.fromStages( load );
+ return bus;
}
@@ -79,22 +42,27 @@ public class CollectionPipelineModule extends AbstractModule {
/**
* Configure all stages here
*/
- Multibinder<ExecutionStage> stageBinder = Multibinder.newSetBinder( binder(), ExecutionStage.class );
+ Multibinder<EventStage> stageBinder = Multibinder.newSetBinder( binder(), EventStage.class );
+ /**
+ * Note we have to have the .asEagerSingleton(); or guice never loads these impls b/c they aren't
+ * directly referenced
+ */
//creation stages
- stageBinder.addBinding().to( Commit.class );
- stageBinder.addBinding().to( Create.class );
- stageBinder.addBinding().to( StartWrite.class );
- stageBinder.addBinding().to( Update.class );
- stageBinder.addBinding().to( Verify.class );
+ stageBinder.addBinding().to( Commit.class ).asEagerSingleton();
+ stageBinder.addBinding().to( Create.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( StartWrite.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( Update.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( Verify.class ).asEagerSingleton();;
//delete stages
- stageBinder.addBinding().to( Delete.class );
- stageBinder.addBinding().to( StartDelete.class );
+ stageBinder.addBinding().to( Delete.class ).asEagerSingleton();;
+ stageBinder.addBinding().to( StartDelete.class ).asEagerSingleton();;
//loading stages
- stageBinder.addBinding().to(Load.class);
+ stageBinder.addBinding().to(Load.class).asEagerSingleton();;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
deleted file mode 100644
index 805d1e3..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/** @author tnine */
-public class ExecutionContextImpl implements ExecutionContext {
-
- private final StagePipeline pipeline;
- private final CollectionContext context;
-
- private Object message;
- private ExecutionStage current;
-
-
- @Inject
- public ExecutionContextImpl( final StagePipeline pipeline, final CollectionContext context ) {
- Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
- Preconditions.checkNotNull( context, "context cannot be null" );
-
- this.pipeline = pipeline;
- this.context = context;
- }
-
-
- @Override
- public void execute( Object input ) {
-
- current = this.pipeline.first();
-
- setMessage( input );
-
- current.performStage( this );
- }
-
-
- @Override
- public <T> T getMessage( final Class<T> clazz ) {
- Preconditions.checkNotNull( clazz, "Class must be specified" );
-
- if ( message == null ) {
- return null;
- }
-
- if ( !clazz.isInstance( message ) ) {
- throw new ClassCastException(
- "Message must be an instance of class " + clazz + ". However it was of type '" + message.getClass()
- + "'" );
- }
-
-
- return ( T ) message;
- }
-
-
- @Override
- public Object setMessage( final Object object ) {
- Object original = message;
-
- this.message = object;
-
- return original;
- }
-
-
- @Override
- public void proceed() {
- ExecutionStage next = this.pipeline.nextStage( current );
-
- //Nothing to do
- if ( next == null ) {
- return;
- }
-
- current = next;
- current.performStage( this );
- }
-
-
-
- @Override
- public CollectionContext getCollectionContext() {
- return this.context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
deleted file mode 100644
index 1f3a0fe..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-
-
-/** @author tnine */
-public class StagePipelineImpl implements StagePipeline {
-
- private final List<ExecutionStage> executionStages;
-
-
- protected StagePipelineImpl( List<ExecutionStage> executionStages ) {
- Preconditions.checkNotNull( executionStages, "executionStages is required");
- Preconditions.checkArgument( executionStages.size() > 0, "executionStages must have more than 1 element" );
-
- this.executionStages = executionStages;
- }
-
-
- @Override
- public ExecutionStage first() {
-
- if ( executionStages.size() == 0 ) {
- return null;
- }
-
- return executionStages.get( 0 );
- }
-
-
-
-
-
- @Override
- public ExecutionStage nextStage( final ExecutionStage executionStage ) {
-
- Preconditions.checkNotNull( executionStage, "ExecutionStage cannot be null" );
-
- int index = executionStages.indexOf( executionStage );
-
- //we're done, do nothing
- if ( index == executionStages.size() -1 ) {
- return null;
- }
-
- return executionStages.get( index + 1 );
- }
-
-
- /** Factory to create a new instance. */
- public static StagePipelineImpl fromStages( ExecutionStage... executionStages ) {
- return new StagePipelineImpl(Arrays.asList( executionStages ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
index 7810beb..feed7fa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Delete implements ExecutionStage {
+public class Delete implements EventStage<DeleteCommit> {
private static final Logger LOG = LoggerFactory.getLogger( Delete.class );
@@ -34,20 +35,24 @@ public class Delete implements ExecutionStage {
@Inject
public Delete( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final CollectionEventBus eventBus ) {
Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
- Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+ @Subscribe
+ public void performStage( final DeleteCommit event ) {
+
+ final MvccEntity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -58,11 +63,11 @@ public class Delete implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -84,8 +89,6 @@ public class Delete implements ExecutionStage {
/**
* We're done executing.
*/
- executionContext.proceed();
- //TODO connect to post processors via listener
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
deleted file mode 100644
index 52fe4b9..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface DeletePipeline {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
new file mode 100644
index 0000000..5f8840b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteStart extends CollectionEvent<UUID> {
+
+ public DeleteStart( final CollectionContext context, final UUID data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
new file mode 100644
index 0000000..dfdebba
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteCommit extends CollectionEvent<MvccEntity> {
+ public DeleteCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
index 4208662..ca1865b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
@@ -8,17 +8,18 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
@@ -30,36 +31,37 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
* new write in the data store for a checkpoint and recovery
*/
@Singleton
-public class StartDelete implements ExecutionStage {
+public class StartDelete implements EventStage<DeleteStart> {
private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
+ private final CollectionEventBus eventBus;
private final MvccLogEntrySerializationStrategy logStrategy;
private final UUIDService uuidService;
/** Create a new stage with the current context */
@Inject
- public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
-
+ public StartDelete( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logStrategy,
+ final UUIDService uuidService ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.logStrategy = logStrategy;
this.uuidService = uuidService;
+
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final DeleteStart event ) {
- final UUID entityId = executionContext.getMessage( UUID.class );
+ final UUID entityId = event.getData();
final UUID version = uuidService.newTimeUUID();
@@ -68,12 +70,11 @@ public class StartDelete implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.ACTIVE );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
MutationBatch write = logStrategy.write( collectionContext, startEntry );
@@ -90,7 +91,6 @@ public class StartDelete implements ExecutionStage {
//create the mvcc entity for the next stage
final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
- executionContext.setMessage( nextStage );
- executionContext.proceed();
+ eventBus.post( new DeleteCommit( collectionContext, nextStage, event.getResult() ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
new file mode 100644
index 0000000..6f3fab9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventLoad extends CollectionEvent<UUID> {
+ public EventLoad( final CollectionContext context, final UUID data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
index 00a2d43..a2acf3e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
@@ -8,22 +8,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-/**
- * This stage is a load stage to load a single entity
- */
-public class Load implements ExecutionStage {
+/** This stage is a load stage to load a single entity */
+public class Load implements EventStage<EventLoad> {
private static final Logger LOG = LoggerFactory.getLogger( Load.class );
@@ -33,24 +32,26 @@ public class Load implements ExecutionStage {
@Inject
- public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+ public Load(final CollectionEventBus eventBus, final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
this.uuidService = uuidService;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final UUID entityId = executionContext.getMessage( UUID.class );
+ @Subscribe
+ public void performStage( final EventLoad event ) {
+ final UUID entityId = event.getData();
Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
//generate a version that represents now
final UUID versionMax = uuidService.newTimeUUID();
@@ -58,25 +59,21 @@ public class Load implements ExecutionStage {
List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
//nothing to do, we didn't get a result back
- if(results.size() != 1){
- executionContext.setMessage( null );
- executionContext.proceed();
+ if ( results.size() != 1 ) {
return;
}
- final Optional<Entity> targetVersion = results.get(0).getEntity();
+ final Optional<Entity> targetVersion = results.get( 0 ).getEntity();
//this entity has been marked as cleared. The version exists, but does not have entity data
- if(!targetVersion.isPresent()){
+ if ( !targetVersion.isPresent() ) {
//TODO, a lazy async repair/cleanup here?
- executionContext.setMessage( null );
- executionContext.proceed();
return;
}
- executionContext.setMessage( targetVersion.get() );
- executionContext.proceed();
+ //this feels like a hack. Not sure I like this
+ event.getResult().addResult( targetVersion.get() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
deleted file mode 100644
index 0d24b27..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineLoad {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
index 4780ff1..65af8cf 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements ExecutionStage {
+public class Commit implements EventStage<EventCommit> {
private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
@@ -33,20 +34,22 @@ public class Commit implements ExecutionStage {
@Inject
- public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ public Commit( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final MvccEntitySerializationStrategy entitySerializationStrategy ) {
Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
- Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+ Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ eventBus.register( this );
}
@Override
- public void performStage( final ExecutionContext executionContext ) {
- final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+ @Subscribe
+ public void performStage( final EventCommit event ) {
+ final MvccEntity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -57,11 +60,11 @@ public class Commit implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.COMMITTED );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
@@ -80,11 +83,10 @@ public class Commit implements ExecutionStage {
throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
}
- /**
- * We're done executing.
- */
- executionContext.proceed();
-
- //TODO connect to post processors via listener
+ //add the mvccEntity to the result
+ event.getResult().addResult( entity );
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
index cbd7d9b..971e9f4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
@@ -9,14 +9,15 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.util.Verify;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -26,35 +27,37 @@ import com.google.inject.Singleton;
* present, and this should set the entityId, version, created, and updated dates
*/
@Singleton
-public class Create implements ExecutionStage {
+public class Create implements EventStage<EventCreate> {
private static final Logger LOG = LoggerFactory.getLogger( Create.class );
+ private final CollectionEventBus eventBus;
private final TimeService timeService;
private final UUIDService uuidService;
@Inject
- public Create( final TimeService timeService, final UUIDService uuidService ) {
+ public Create( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( timeService, "timeService is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.timeService = timeService;
this.uuidService = uuidService;
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final EventCreate event ) {
+ Preconditions.checkNotNull(event, "event is required" );
- final Entity entity = executionContext.getMessage( Entity.class );
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -78,8 +81,6 @@ public class Create implements ExecutionStage {
entity.setCreated( created );
entity.setUpdated( created );
- //set the updated entity for the next stage
- executionContext.setMessage( entity );
- executionContext.proceed();
+ eventBus.post( new EventStart( event.getCollectionContext(), entity, event.getResult() ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
new file mode 100644
index 0000000..0091770
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+public class EventCommit extends CollectionEvent<MvccEntity> {
+
+
+ protected EventCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
new file mode 100644
index 0000000..d5a609f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventCreate extends CollectionEvent<Entity> {
+
+
+ public EventCreate( final CollectionContext context, final Entity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
new file mode 100644
index 0000000..87e2488
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventStart extends CollectionEvent<Entity> {
+
+ public EventStart( final CollectionContext context, final Entity data, final Result result ) {
+
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
new file mode 100644
index 0000000..68fb6ab
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventUpdate extends CollectionEvent<Entity> {
+
+ public EventUpdate( final CollectionContext context, final Entity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
new file mode 100644
index 0000000..f078aa5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventVerify extends CollectionEvent<MvccEntity> {
+
+
+ public EventVerify( final CollectionContext context, final MvccEntity data, final Result result ) {
+ super( context, data, result );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
deleted file mode 100644
index f3af972..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineCreate {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
deleted file mode 100644
index 85bc56d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface PipelineUpdate {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
index d6e7d49..ac153f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
@@ -8,15 +8,16 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionContext;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
@@ -28,32 +29,32 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
* new write in the data store for a checkpoint and recovery
*/
@Singleton
-public class StartWrite implements ExecutionStage {
+public class StartWrite implements EventStage<EventStart> {
private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
+ private final CollectionEventBus eventBus;
private final MvccLogEntrySerializationStrategy logStrategy;
/** Create a new stage with the current context */
@Inject
- public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+ public StartWrite( final CollectionEventBus eventBus,
+ final MvccLogEntrySerializationStrategy logStrategy ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-
+ this.eventBus = eventBus;
this.logStrategy = logStrategy;
+
+ this.eventBus.register( this );
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
-
- final Entity entity = executionContext.getMessage( Entity.class );
+ @Subscribe
+ public void performStage( final EventStart event ) {
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -64,12 +65,11 @@ public class StartWrite implements ExecutionStage {
Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
- final CollectionContext collectionContext = executionContext.getCollectionContext();
+ final CollectionContext collectionContext = event.getCollectionContext();
- final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
- .collection.mvcc.entity.Stage.ACTIVE );
+ final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+ org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
MutationBatch write = logStrategy.write( collectionContext, startEntry );
@@ -86,7 +86,9 @@ public class StartWrite implements ExecutionStage {
//create the mvcc entity for the next stage
final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
- executionContext.setMessage( nextStage );
- executionContext.proceed();
+ eventBus.post( new EventVerify( collectionContext, nextStage, event.getResult() ) );
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
index 0175b42..a095fb3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
@@ -6,13 +6,14 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
import org.apache.usergrid.persistence.collection.service.TimeService;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -22,33 +23,35 @@ import com.google.inject.Singleton;
* been set correctly
*/
@Singleton
-public class Update implements ExecutionStage {
+public class Update implements EventStage<EventUpdate> {
private static final Logger LOG = LoggerFactory.getLogger( Update.class );
private final TimeService timeService;
private final UUIDService uuidService;
+ private final CollectionEventBus eventBus;
@Inject
- public Update( final TimeService timeService, final UUIDService uuidService ) {
+ public Update( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+ Preconditions.checkNotNull( eventBus, "eventBus is required" );
Preconditions.checkNotNull( timeService, "timeService is required" );
Preconditions.checkNotNull( uuidService, "uuidService is required" );
+ this.eventBus = eventBus;
this.timeService = timeService;
this.uuidService = uuidService;
+
+ this.eventBus.register( this );
+
}
- /**
- * Create the entity Id and inject it, as well as set the timestamp versions
- *
- * @param executionContext The context of the current write operation
- */
@Override
- public void performStage( final ExecutionContext executionContext ) {
+ @Subscribe
+ public void performStage( final EventUpdate event ) {
- final Entity entity = executionContext.getMessage( Entity.class );
+ final Entity entity = event.getData();
Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
@@ -64,7 +67,7 @@ public class Update implements ExecutionStage {
entity.setVersion( version );
entity.setUpdated( updated );
- executionContext.setMessage( entity );
- executionContext.proceed();
+ //fire the start event
+ eventBus.post(new EventStart( event.getCollectionContext(), entity, event.getResult()) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
index 884c59b..5a05869 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
@@ -1,25 +1,31 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
/** This phase should execute any verification on the MvccEntity */
@Singleton
-public class Verify implements ExecutionStage {
+public class Verify implements EventStage<EventVerify> {
+ private final CollectionEventBus eventBus;
- public Verify() {
+ @Inject
+ public Verify( final CollectionEventBus eventBus ) {
+ this.eventBus = eventBus;
+ this.eventBus.register( this );
}
+
@Override
- public void performStage( final ExecutionContext executionContext ) {
- //TODO no op for now, just continue to the next stage. Verification logic goes in here
+ public void performStage( final EventVerify event ) {
+ //no op, verification needs to happen here
- executionContext.proceed();
+ eventBus.post( new EventCommit(event.getCollectionContext(), event.getData(), event.getResult()) );
}
}