You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/08 01:20:52 UTC
[3/3] git commit: Upgraded queue system and processing
Upgraded queue system and processing
Updated RX to 0.17-RC7
Rolled custom Hystrix to test RX integration
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/51a9ffd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/51a9ffd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/51a9ffd9
Branch: refs/heads/asyncqueue
Commit: 51a9ffd9218bdaf4884709fd04bae9c046dc67ec
Parents: c45f7ee
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 16:59:06 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 16:59:06 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 21 +--
.../collection/hystrix/CassandraCommand.java | 4 +-
.../impl/EntityCollectionManagerImpl.java | 6 +-
.../mvcc/stage/delete/MarkCommit.java | 2 +-
.../collection/mvcc/stage/delete/MarkStart.java | 2 +-
.../collection/mvcc/stage/load/Load.java | 2 +-
.../mvcc/stage/write/WriteCommit.java | 2 +-
.../mvcc/stage/write/WriteOptimisticVerify.java | 2 +-
.../collection/mvcc/stage/write/WriteStart.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 5 +-
.../collection/rx/CassandraThreadScheduler.java | 2 +-
.../rx/CassandraThreadSchedulerTest.java | 44 +++----
.../persistence/collection/rx/ParallelTest.java | 8 +-
.../usergrid/persistence/graph/GraphFig.java | 14 ++
.../graph/consistency/AsyncProcessor.java | 50 +++++---
.../graph/consistency/AsyncProcessorImpl.java | 60 ++++++---
.../graph/consistency/CompleteListener.java | 14 ++
.../graph/consistency/TimeoutTask.java | 61 +++++++++
.../graph/impl/EdgeDeleteListener.java | 6 +-
.../persistence/graph/impl/EdgeManagerImpl.java | 2 +-
.../graph/impl/EdgeWriteListener.java | 2 +-
.../graph/impl/NodeDeleteListener.java | 6 +-
.../impl/parse/ObservableIterator.java | 18 ++-
.../graph/consistency/AsyncProcessorTest.java | 127 +++++++++++++++----
24 files changed, 337 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index db3ce3a..00db576 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -159,17 +159,17 @@
<!-- RX java -->
<dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
- <version>0.16.1</version>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-core</artifactId>
+ <version>0.17.0-RC7</version>
</dependency>
- <dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-contrib</artifactId>
- <version>0.14.6</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>com.netflix.rxjava</groupId>-->
+ <!--<artifactId>rxjava-contrib</artifactId>-->
+ <!--<version>0.14.6</version>-->
+ <!--</dependency>-->
<dependency>
<groupId>com.google.inject.extensions</groupId>
@@ -201,10 +201,13 @@
<version>${log4j.version}</version>
</dependency>
+ <!--Remove custom build once this patch is complete
+ https://github.com/Netflix/Hystrix/pull/209-->
+
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
- <version>1.3.8</version>
+ <version>1.3.14-SNAPSHOT</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index c87bf33..731933a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -24,7 +24,7 @@ import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
/**
@@ -69,6 +69,6 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
*/
private static <R> Observable<R> toObservable( R readValue ) {
//create a new command and ensure it's observed on the correct thread scheduler
- return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO() );
+ return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 4391d09..ef95a75 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -47,9 +47,9 @@ import com.google.inject.assistedinject.Assisted;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.Func2;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.functions.FuncN;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..06c875e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -41,7 +41,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..e718f4b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -45,7 +45,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
index 20abc5a..e0d0f77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index bcec305..4c90e69 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -41,7 +41,7 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import org.apache.usergrid.persistence.model.field.Field;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index e42019c..7a70dfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -8,7 +8,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 01fee57..a0ff1b1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -24,7 +24,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 55280c8..fe53b13 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,8 +40,8 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
/**
@@ -168,6 +168,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
};
+
return Observable.zip( fields, zipFunction );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
index 90f3b2d..a5baf76 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -35,7 +35,7 @@ import com.google.inject.Provider;
import com.google.inject.name.Named;
import rx.Scheduler;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
public class CassandraThreadScheduler implements Provider<Scheduler> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
index 45a175e..cf399c3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -18,6 +18,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import com.google.inject.Inject;
import rx.Scheduler;
+import rx.functions.Action1;
import rx.util.functions.Action0;
import static org.junit.Assert.assertTrue;
@@ -63,13 +64,11 @@ public class CassandraThreadSchedulerTest {
//schedule and we should fail
try {
-
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
- //no op
- }
- } );
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
+ }
+ });
fail( "This should have thrown an exception" );
}
@@ -127,9 +126,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -158,9 +157,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -215,9 +214,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -244,9 +243,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -295,9 +294,10 @@ public class CassandraThreadSchedulerTest {
final CountDownLatch latch = new CountDownLatch( totalCount );
for ( int i = 0; i < totalCount; i++ ) {
- final Action0 action = new Action0() {
- @Override
- public void call() {
+
+ final Action1<Scheduler.Inner> action = new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
try {
final String threadName = Thread.currentThread().getName();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index ac252fb..434ea26 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -38,9 +38,9 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -82,7 +82,7 @@ public class ParallelTest {
// final Scheduler scheduler = Schedulers.threadPoolForComputation();
//use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
- final Scheduler scheduler = Schedulers.threadPoolForIO();
+ final Scheduler scheduler = Schedulers.io();
//set our size equal
ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index bbf83be..3f87d14 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,11 @@ public interface GraphFig extends GuicyFig {
public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+
+ public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
+
+ public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
+
public static final String READ_CL = "usergrid.graph.read.cl";
public static final String WRITE_CL = "usergrid.graph.write.cl";
@@ -37,6 +42,15 @@ public interface GraphFig extends GuicyFig {
@Key( WRITE_TIMEOUT )
long getWriteTimeout();
+ @Default( "100" )
+ @Key( TIMEOUT_SIZE )
+ int getTimeoutReadSize();
+
+ @Default("500")
+ @Key( TIMEOUT_TASK_TIME )
+ long getTaskLoopTime();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 150aa6b..016b45d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -1,48 +1,62 @@
package org.apache.usergrid.persistence.graph.consistency;
+import java.util.Collection;
+
+
/**
- * Used to fork lazy repair and other types of operations.
- *
+ * Used to fork lazy repair and other types of operations.
*/
public interface AsyncProcessor<T> {
/**
- * The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
- * This could be local or clustered, consult the documentation on the implementation. Note that events published
- * here could possibly be double published if the operation reaches it's timeout before completion. As a result, every
- * receiver of the event should operate in an idempotent way. Note that the event will fire at a time >= the timeout time.
- * Firing immediately should not be assumed.
+ * The processor implementation is responsible for guaranteeing the events fire in the runtime environment. This
+ * could be local or clustered, consult the documentation on the implementation. Note that events published here
+ * could possibly be double published if the operation reaches it's timeout before completion. As a result, every
+ * receiver of the event should operate in an idempotent way. Note that the event will fire at a time >= the
+ * timeout time. Firing immediately should not be assumed.
*
* @param event The event to be scheduled for verification
- * @param timeout The time in milliseconds we should wait before the event should fire
+ * @param timeout The time in milliseconds we should wait before the event should fire
*/
public AsynchronousMessage<T> setVerification( T event, long timeout );
/**
- * Start processing the event immediately asynchronously. In the event an exception is thrown, the AsynchronousMessage should be re-tried.
- * It is up to the implementer to commit the event so that it does not fire again. This should never throw exceptions.
+ * Start processing the event immediately asynchronously. In the event an exception is thrown, the
+ * AsynchronousMessage should be re-tried. It is up to the implementer to commit the event so that it does not fire
+ * again. This should never throw exceptions.
*
* @param event The event to start
*/
- public void start(AsynchronousMessage<T> event);
+ public void start( AsynchronousMessage<T> event );
+
+
+ /**
+ * Get all events that have passed their timeout
+ *
+ * @param maxCount The maximum count
+ * @param timeout The timeout to set when retrieving these timeouts to ensure they aren't lost
+ *
+ * @return A collection of asynchronous messages that have passed their timeout. This could be due to process
+ * failure node loss etc. No assumptions regarding the state of the message should be assumed when they are
+ * returned.
+ */
+ public Collection<AsynchronousMessage<T>> getTimeouts( int maxCount, long timeout );
/**
* Add the error listener to the list of listeners
- * @param listener
*/
public void addErrorListener( ErrorListener<T> listener );
/**
* Add the listener to this instance
- * @param listener
*/
- public void addListener(MessageListener<T, T> listener);
-
-
-
-
+ public void addListener( MessageListener<T, T> listener );
+ /**
+ * Add a complete listener that is invoked when the listener has been invoked
+ */
+ public void addCompleteListener( CompleteListener<T> listener );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 20c40e2..d962a88 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -2,45 +2,59 @@ package org.apache.usergrid.persistence.graph.consistency;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Action0;
-import rx.util.functions.Action1;
-import rx.util.functions.FuncN;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
/**
- * The implementation of asynchronous processing.
- * This is intentionally kept as a 1 processor to 1 event type mapping
+ * The implementation of asynchronous processing. This is intentionally kept as a 1 processor to 1 event type mapping
* This way reflection is not used, event dispatching is easier, and has compile time checking
*/
@Singleton
public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
- private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
-
- private final TimeoutQueue<T> queue;
- private final Scheduler scheduler;
- private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+ /**
+ * TODO, run this with hystrix
+ */
private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
- private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
+ protected final TimeoutQueue<T> queue;
+ protected final Scheduler scheduler;
+ protected final GraphFig graphFig;
+ protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+
+
+ protected List<ErrorListener<T>> errorListeners = new ArrayList<ErrorListener<T>>();
+ protected List<CompleteListener<T>> completeListeners = new ArrayList<CompleteListener<T>>();
@Inject
- public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
+ public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
this.queue = queue;
this.scheduler = scheduler;
+ this.graphFig = graphFig;
+
+ //we purposefully use a new thread. We don't want to use one of the I/O threads to run this task
+ //in the event the scheduler is full, we'll end up rejecting the reschedule of this task
+ Schedulers.newThread().schedulePeriodically( new TimeoutTask<T>(this, graphFig), graphFig.getTaskLoopTime(), graphFig.getTaskLoopTime(), TimeUnit.MILLISECONDS );
}
@@ -52,8 +66,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
public void start( final AsynchronousMessage<T> event ) {
-
-
final T data = event.getEvent();
/**
* Execute all listeners in parallel
@@ -83,6 +95,10 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
public void call() {
queue.remove( event );
+
+ for ( CompleteListener<T> listener : completeListeners ) {
+ listener.onComplete( event );
+ }
}
} ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
@@ -94,6 +110,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
+ public Collection<AsynchronousMessage<T>> getTimeouts( final int maxCount, final long timeout ) {
+ return queue.take( maxCount, timeout );
+ }
+
+
+ @Override
public void addListener( final MessageListener<T, T> listener ) {
this.listeners.add( listener );
}
@@ -105,4 +127,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
public void addErrorListener( ErrorListener<T> listener ) {
this.errorListeners.add( listener );
}
+
+
+ @Override
+ public void addCompleteListener( final CompleteListener<T> listener ) {
+ this.completeListeners.add( listener );
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
new file mode 100644
index 0000000..c415005
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing. Can be used to hook into error state
+ */
+public interface CompleteListener<T> {
+
+ /**
+ * Invoked when an event is complete
+ * @param event
+ */
+ void onComplete( AsynchronousMessage<T> event );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
new file mode 100644
index 0000000..6607724
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -0,0 +1,61 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+
+import rx.Scheduler;
+import rx.functions.Action1;
+
+
+/**
+ *
+ *
+ */
+public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
+
+ private final AsyncProcessor<T> processor;
+ private final GraphFig graphFig;
+
+
+ public TimeoutTask( final AsyncProcessor<T> processor, final GraphFig graphFig ) {
+ this.processor = processor;
+ this.graphFig = graphFig;
+ }
+
+
+ @Override
+ public void call( final Scheduler.Inner inner ) {
+
+ /**
+ * We purposefully loop through a tight loop. If we have anything to process, we need to do so
+ * Once we run out of items to process, this thread will sleep and the timer will fire
+ */
+ while(!inner.isUnsubscribed()) {
+
+ Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
+
+ /**
+ * We're done, just exit
+ */
+ if(!timeouts.hasNext()){
+ return;
+ }
+
+ while ( timeouts.hasNext() ) {
+ processor.start( timeouts.next() );
+ }
+
+ }
+ }
+
+
+ /**
+ * Get the timeouts
+ * @return
+ */
+ private Iterator<AsynchronousMessage<T>> getTimeouts() {
+ return processor.getTimeouts( graphFig.getTimeoutReadSize(), graphFig.getWriteTimeout() * 2 ).iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 824b05c..6feded3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -23,9 +23,9 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
-import rx.util.functions.Func5;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func5;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 663215a..b84bd8e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -56,7 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index 428085e..271375a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -22,7 +22,7 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 08cc942..aabc572 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -20,9 +20,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import rx.Notification;
import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
+import rx.functions.Action1;
+import rx.functions.Func1;
/**
@@ -75,6 +76,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
// delete them. We might want to batch this up for efficiency
return loadEdgesToTarget( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+
.doOnEach( new Action1<MarkedEdge>() {
@Override
public void call( final MarkedEdge markedEdge ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 58ff6c7..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -5,6 +5,7 @@ import java.util.Iterator;
import rx.Observable;
import rx.Observer;
+import rx.Subscriber;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
@@ -14,11 +15,11 @@ import rx.subscriptions.Subscriptions;
* This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O. This allows
* us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
*/
-public abstract class ObservableIterator<T> implements Observable.OnSubscribeFunc<T> {
+public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
@Override
- public Subscription onSubscribe( final Observer<? super T> observer ) {
+ public void call( final Subscriber<? super T> subscriber ) {
try {
@@ -26,25 +27,22 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribeFun
Iterator<T> itr = getIterator();
- //TODO T.N. when > 0.17 comes out, we need to implement the check with each loop as described here https://github.com/Netflix/RxJava/issues/802
- while ( itr.hasNext()) {
- observer.onNext( itr.next() );
+ //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+ while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+ subscriber.onNext( itr.next() );
}
- observer.onCompleted();
+ subscriber.onCompleted();
}
//if any error occurs, we need to notify the observer so it can perform it's own error handling
catch ( Throwable t ) {
- observer.onError( t );
+ subscriber.onError( t );
}
- return Subscriptions.empty();
}
-
-
/**
* Return the iterator to feed data to
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index fc53705..9822cc5 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.graph.consistency;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
@@ -29,7 +31,7 @@ import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.concurrency.Schedulers;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -70,7 +72,7 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- AsyncProcessor asyncProcessor = constructProcessor( queue, null );
+ AsyncProcessor asyncProcessor = constructProcessor( queue );
//mock up the queue
@@ -89,6 +91,8 @@ public class AsyncProcessorTest {
final TestListener listener = new TestListener();
+
+
final TestEvent event = new TestEvent();
@@ -108,9 +112,17 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- final AsyncProcessor asyncProcessor = constructProcessor( queue, listener );
+ final AsyncProcessor asyncProcessor = constructProcessor( queue );
+
+ asyncProcessor.addListener( listener );
+
+
+ final CountDownLatch latch = new CountDownLatch( 2 );
+
+ final TestCompleteListener completeListener = new TestCompleteListener(latch);
+
+ asyncProcessor.addCompleteListener( completeListener );
- final CountDownLatch latch = new CountDownLatch( 1 );
//mock up the ack to allow us to block the test until the async confirm fires
when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
@@ -132,11 +144,15 @@ public class AsyncProcessorTest {
final TestEvent firedEvent = listener.events.peek();
assertSame( event, firedEvent );
+
+ final TestEvent completeEvent = completeListener.events.peek();
+
+ assertSame(event, completeEvent);
}
-// @Test( timeout = 5000 )
- @Test
+ // @Test( timeout = 5000 )
+ @Test
public void verifyErrorExecution() throws InterruptedException {
final AsynchronousErrorListener listener = new AsynchronousErrorListener();
@@ -163,7 +179,9 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- final AsyncProcessorImpl asyncProcessor = constructProcessor( queue, listener );
+ final AsyncProcessorImpl asyncProcessor = constructProcessor( queue );
+
+ asyncProcessor.addListener( listener );
final CountDownLatch latch = new CountDownLatch( 1 );
@@ -177,7 +195,6 @@ public class AsyncProcessorTest {
invoked[1] = true;
latch.countDown();
}
-
} );
//throw an error if remove is called. This shouldn't happen
@@ -209,13 +226,56 @@ public class AsyncProcessorTest {
}
+ @Test
+ public void verifyTimeout() {
+
+
+ final long timeout = 500;
+ final TestEvent event = new TestEvent();
+
+
+ final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
+ @Override
+ public TestEvent getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+ };
+
+ final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+ when(queue.take( 1, 10000l )).thenReturn( Collections.singletonList(asynchronousMessage ));
+
+ AsyncProcessor<TestEvent> processor = constructProcessor( queue );
+
+
+ Collection<AsynchronousMessage<TestEvent>> timeouts = processor.getTimeouts( 1, 10000l );
+
+ assertEquals(1, timeouts.size());
+
+ AsynchronousMessage<TestEvent> returned = timeouts.iterator().next();
+
+ assertSame(asynchronousMessage, returned);
+
+
+
+ }
+
+
+
/**
* Construct the async processor
*/
- public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
+ public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
+
+ AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
- AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
- processor.addListener( listener );
return processor;
}
@@ -245,12 +305,29 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
- @Override
- public void call( final TestEvent testEvent ) {
- events.push( testEvent );
- }
- });
+ return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ }
+ } );
+ }
+ }
+
+
+ public static class TestCompleteListener implements CompleteListener<TestEvent> {
+
+ private final CountDownLatch latch;
+ private final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+ public TestCompleteListener( final CountDownLatch latch ) {this.latch = latch;}
+
+
+ @Override
+ public void onComplete( final AsynchronousMessage<TestEvent> event ) {
+ events.push( event.getEvent() );
+ latch.countDown();
}
}
@@ -265,15 +342,13 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
- @Override
- public void call( final TestEvent testEvent ) {
- events.push( testEvent );
- throw new RuntimeException( "Test Exception thrown. Failed to process event" );
- }
- });
+ return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ throw new RuntimeException( "Test Exception thrown. Failed to process event" );
+ }
+ } );
}
-
-
}
}