You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:06 UTC

[07/43] git commit: Upgraded queue system and processing

Upgraded queue system and processing

Updated RX to 0.17-RC7

Rolled custom Hystrix to test RX integration


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/51a9ffd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/51a9ffd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/51a9ffd9

Branch: refs/heads/two-dot-o
Commit: 51a9ffd9218bdaf4884709fd04bae9c046dc67ec
Parents: c45f7ee
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 16:59:06 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 16:59:06 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  21 +--
 .../collection/hystrix/CassandraCommand.java    |   4 +-
 .../impl/EntityCollectionManagerImpl.java       |   6 +-
 .../mvcc/stage/delete/MarkCommit.java           |   2 +-
 .../collection/mvcc/stage/delete/MarkStart.java |   2 +-
 .../collection/mvcc/stage/load/Load.java        |   2 +-
 .../mvcc/stage/write/WriteCommit.java           |   2 +-
 .../mvcc/stage/write/WriteOptimisticVerify.java |   2 +-
 .../collection/mvcc/stage/write/WriteStart.java |   2 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   5 +-
 .../collection/rx/CassandraThreadScheduler.java |   2 +-
 .../rx/CassandraThreadSchedulerTest.java        |  44 +++----
 .../persistence/collection/rx/ParallelTest.java |   8 +-
 .../usergrid/persistence/graph/GraphFig.java    |  14 ++
 .../graph/consistency/AsyncProcessor.java       |  50 +++++---
 .../graph/consistency/AsyncProcessorImpl.java   |  60 ++++++---
 .../graph/consistency/CompleteListener.java     |  14 ++
 .../graph/consistency/TimeoutTask.java          |  61 +++++++++
 .../graph/impl/EdgeDeleteListener.java          |   6 +-
 .../persistence/graph/impl/EdgeManagerImpl.java |   2 +-
 .../graph/impl/EdgeWriteListener.java           |   2 +-
 .../graph/impl/NodeDeleteListener.java          |   6 +-
 .../impl/parse/ObservableIterator.java          |  18 ++-
 .../graph/consistency/AsyncProcessorTest.java   | 127 +++++++++++++++----
 24 files changed, 337 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index db3ce3a..00db576 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -159,17 +159,17 @@
     <!-- RX java -->
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>0.16.1</version>
+        <groupId>com.netflix.rxjava</groupId>
+        <artifactId>rxjava-core</artifactId>
+        <version>0.17.0-RC7</version>
     </dependency>
 
 
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-contrib</artifactId>
-      <version>0.14.6</version>
-    </dependency>
+    <!--<dependency>-->
+      <!--<groupId>com.netflix.rxjava</groupId>-->
+      <!--<artifactId>rxjava-contrib</artifactId>-->
+      <!--<version>0.14.6</version>-->
+    <!--</dependency>-->
 
     <dependency>
       <groupId>com.google.inject.extensions</groupId>
@@ -201,10 +201,13 @@
       <version>${log4j.version}</version>
     </dependency>
 
+    <!--Remove custom build once this patch is complete
+    https://github.com/Netflix/Hystrix/pull/209-->
+
     <dependency>
       <groupId>com.netflix.hystrix</groupId>
       <artifactId>hystrix-core</artifactId>
-      <version>1.3.8</version>
+      <version>1.3.14-SNAPSHOT</version>
     </dependency>
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index c87bf33..731933a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -24,7 +24,7 @@ import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -69,6 +69,6 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
      */
     private static <R> Observable<R> toObservable( R readValue ) {
         //create a new command and ensure it's observed on the correct thread scheduler
-        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO() );
+        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 4391d09..ef95a75 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -47,9 +47,9 @@ import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.Func2;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.functions.FuncN;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..06c875e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -41,7 +41,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..e718f4b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -45,7 +45,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
index 20abc5a..e0d0f77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index bcec305..4c90e69 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -41,7 +41,7 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.model.field.Field;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index e42019c..7a70dfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -8,7 +8,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 01fee57..a0ff1b1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -24,7 +24,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 55280c8..fe53b13 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,8 +40,8 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
 
 
 /**
@@ -168,6 +168,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
         };
 
 
+
         return Observable.zip( fields, zipFunction );
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
index 90f3b2d..a5baf76 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -35,7 +35,7 @@ import com.google.inject.Provider;
 import com.google.inject.name.Named;
 
 import rx.Scheduler;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
 
 
 public class CassandraThreadScheduler implements Provider<Scheduler> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
index 45a175e..cf399c3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -18,6 +18,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import com.google.inject.Inject;
 
 import rx.Scheduler;
+import rx.functions.Action1;
 import rx.util.functions.Action0;
 
 import static org.junit.Assert.assertTrue;
@@ -63,13 +64,11 @@ public class CassandraThreadSchedulerTest {
         //schedule and we should fail
 
         try {
-
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
-                    //no op
-                }
-            } );
+            rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
+                            }
+                            });
 
             fail( "This should have thrown an exception" );
         }
@@ -127,9 +126,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -158,9 +157,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -215,9 +214,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -244,9 +243,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -295,9 +294,10 @@ public class CassandraThreadSchedulerTest {
         final CountDownLatch latch = new CountDownLatch( totalCount );
 
         for ( int i = 0; i < totalCount; i++ ) {
-            final Action0 action = new Action0() {
-                @Override
-                public void call() {
+
+            final Action1<Scheduler.Inner> action = new  Action1<Scheduler.Inner>() {
+                                       @Override
+                                       public void call( final Scheduler.Inner inner ) {
                     try {
                         final String threadName = Thread.currentThread().getName();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index ac252fb..434ea26 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -38,9 +38,9 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -82,7 +82,7 @@ public class ParallelTest {
         //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
 
         //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
-        final Scheduler scheduler = Schedulers.threadPoolForIO();
+        final Scheduler scheduler = Schedulers.io();
 
         //set our size equal
         ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index bbf83be..3f87d14 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,11 @@ public interface GraphFig extends GuicyFig {
 
     public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
+
+    public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
+
+    public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
+
     public static final String READ_CL = "usergrid.graph.read.cl";
 
     public static final String WRITE_CL = "usergrid.graph.write.cl";
@@ -37,6 +42,15 @@ public interface GraphFig extends GuicyFig {
     @Key( WRITE_TIMEOUT )
     long getWriteTimeout();
 
+    @Default( "100" )
+    @Key( TIMEOUT_SIZE )
+    int getTimeoutReadSize();
+
+    @Default("500")
+    @Key( TIMEOUT_TASK_TIME )
+    long getTaskLoopTime();
+
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 150aa6b..016b45d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -1,48 +1,62 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import java.util.Collection;
+
+
 /**
- *  Used to fork lazy repair and other types of operations.
- *
+ * Used to fork lazy repair and other types of operations.
  */
 public interface AsyncProcessor<T> {
 
 
     /**
-     * The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
-     * This could be local or clustered, consult the documentation on the implementation.  Note that events published
-     * here could possibly be double published if the operation reaches it's timeout before completion.  As a result, every
-     * receiver of the event should operate in an idempotent way.  Note that the event will fire at a time >= the timeout time.
-     * Firing immediately should not be assumed.
+     * The processor implementation is responsible for guaranteeing the events fire in the runtime environment. This
+     * could be local or clustered, consult the documentation on the implementation.  Note that events published here
+     * could possibly be double published if the operation reaches it's timeout before completion.  As a result, every
+     * receiver of the event should operate in an idempotent way.  Note that the event will fire at a time >= the
+     * timeout time. Firing immediately should not be assumed.
      *
      * @param event The event to be scheduled for verification
-     * @param timeout  The time in milliseconds we should wait before the event should fire
+     * @param timeout The time in milliseconds we should wait before the event should fire
      */
     public AsynchronousMessage<T> setVerification( T event, long timeout );
 
 
     /**
-     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the AsynchronousMessage should be re-tried.
-     * It is up to the implementer to commit the event so that it does not fire again.  This should never throw exceptions.
+     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the
+     * AsynchronousMessage should be re-tried. It is up to the implementer to commit the event so that it does not fire
+     * again.  This should never throw exceptions.
      *
      * @param event The event to start
      */
-    public void start(AsynchronousMessage<T> event);
+    public void start( AsynchronousMessage<T> event );
+
+
+    /**
+     * Get all events that have passed their timeout
+     *
+     * @param maxCount The maximum count
+     * @param timeout The timeout to set when retrieving these timeouts to ensure they aren't lost
+     *
+     * @return A collection of asynchronous messages that have passed their timeout.  This could be due to process
+     *         failure node loss etc.  No assumptions regarding the state of the message should be assumed when they are
+     *         returned.
+     */
+    public Collection<AsynchronousMessage<T>> getTimeouts( int maxCount, long timeout );
 
     /**
      * Add the error listener to the list of listeners
-     * @param listener
      */
     public void addErrorListener( ErrorListener<T> listener );
 
     /**
      * Add the listener to this instance
-     * @param listener
      */
-    public void addListener(MessageListener<T, T> listener);
-
-
-
-
+    public void addListener( MessageListener<T, T> listener );
 
+    /**
+     * Add a complete listener that is invoked when the listener has been invoked
+     */
+    public void addCompleteListener( CompleteListener<T> listener );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 20c40e2..d962a88 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -2,45 +2,59 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.graph.GraphFig;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Action0;
-import rx.util.functions.Action1;
-import rx.util.functions.FuncN;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 
 /**
- * The implementation of asynchronous processing.
- * This is intentionally kept as a 1 processor to 1 event type mapping
+ * The implementation of asynchronous processing. This is intentionally kept as a 1 processor to 1 event type mapping
  * This way reflection is not used, event dispatching is easier, and has compile time checking
  */
 @Singleton
 public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
-    private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
-
-    private final TimeoutQueue<T> queue;
-    private final Scheduler scheduler;
-    private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+    /**
+     * TODO, run this with hystrix
+     */
 
     private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
 
-    private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
+    protected final TimeoutQueue<T> queue;
+    protected final Scheduler scheduler;
+    protected final GraphFig graphFig;
+    protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+
+
+    protected List<ErrorListener<T>> errorListeners = new ArrayList<ErrorListener<T>>();
+    protected List<CompleteListener<T>> completeListeners = new ArrayList<CompleteListener<T>>();
 
 
     @Inject
-    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
+    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
         this.queue = queue;
         this.scheduler = scheduler;
+        this.graphFig = graphFig;
+
+        //we purposefully use a new thread.  We don't want to use one of the I/O threads to run this task
+        //in the event the scheduler is full, we'll end up rejecting the reschedule of this task
+        Schedulers.newThread().schedulePeriodically( new TimeoutTask<T>(this, graphFig), graphFig.getTaskLoopTime(),  graphFig.getTaskLoopTime(), TimeUnit.MILLISECONDS );
     }
 
 
@@ -52,8 +66,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
     @Override
     public void start( final AsynchronousMessage<T> event ) {
-
-
         final T data = event.getEvent();
         /**
          * Execute all listeners in parallel
@@ -83,6 +95,10 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
             @Override
             public void call() {
                 queue.remove( event );
+
+                for ( CompleteListener<T> listener : completeListeners ) {
+                    listener.onComplete( event );
+                }
             }
         } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
@@ -94,6 +110,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
 
     @Override
+    public Collection<AsynchronousMessage<T>> getTimeouts( final int maxCount, final long timeout ) {
+        return queue.take( maxCount, timeout );
+    }
+
+
+    @Override
     public void addListener( final MessageListener<T, T> listener ) {
         this.listeners.add( listener );
     }
@@ -105,4 +127,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     public void addErrorListener( ErrorListener<T> listener ) {
         this.errorListeners.add( listener );
     }
+
+
+    @Override
+    public void addCompleteListener( final CompleteListener<T> listener ) {
+        this.completeListeners.add( listener );
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
new file mode 100644
index 0000000..c415005
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing.  Can be used to hook into error state
+ */
+public interface CompleteListener<T> {
+
+    /**
+     * Invoked when an event is complete
+     * @param event
+     */
+    void onComplete( AsynchronousMessage<T> event );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
new file mode 100644
index 0000000..6607724
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -0,0 +1,61 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+
+import rx.Scheduler;
+import rx.functions.Action1;
+
+
+/**
+ *
+ *
+ */
+public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
+
+    private final AsyncProcessor<T> processor;
+    private final GraphFig graphFig;
+
+
+    public TimeoutTask( final AsyncProcessor<T> processor, final GraphFig graphFig ) {
+        this.processor = processor;
+        this.graphFig = graphFig;
+    }
+
+
+    @Override
+    public void call( final Scheduler.Inner inner ) {
+
+        /**
+         * We purposefully loop through a tight loop.  If we have anything to process, we need to do so
+         * Once we run out of items to process, this thread will sleep and the timer will fire
+         */
+        while(!inner.isUnsubscribed()) {
+
+            Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
+
+            /**
+             * We're done, just exit
+             */
+            if(!timeouts.hasNext()){
+                return;
+            }
+
+            while ( timeouts.hasNext() ) {
+                processor.start( timeouts.next() );
+            }
+
+        }
+    }
+
+
+    /**
+     * Get the timeouts
+     * @return
+     */
+    private Iterator<AsynchronousMessage<T>> getTimeouts() {
+        return processor.getTimeouts( graphFig.getTimeoutReadSize(), graphFig.getWriteTimeout() * 2 ).iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 824b05c..6feded3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -23,9 +23,9 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
-import rx.util.functions.Func5;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func5;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 663215a..b84bd8e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -56,7 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index 428085e..271375a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -22,7 +22,7 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 08cc942..aabc572 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -20,9 +20,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 
+import rx.Notification;
 import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
+import rx.functions.Action1;
+import rx.functions.Func1;
 
 
 /**
@@ -75,6 +76,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
                                 // delete them. We might want to batch this up for efficiency
                                 return loadEdgesToTarget( scope,
                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+
                                         .doOnEach( new Action1<MarkedEdge>() {
                                             @Override
                                             public void call( final MarkedEdge markedEdge ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 58ff6c7..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -5,6 +5,7 @@ import java.util.Iterator;
 
 import rx.Observable;
 import rx.Observer;
+import rx.Subscriber;
 import rx.Subscription;
 import rx.subscriptions.Subscriptions;
 
@@ -14,11 +15,11 @@ import rx.subscriptions.Subscriptions;
  * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows
  * us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
  */
-public abstract class ObservableIterator<T> implements Observable.OnSubscribeFunc<T> {
+public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
 
 
     @Override
-    public Subscription onSubscribe( final Observer<? super T> observer ) {
+    public void call( final Subscriber<? super T> subscriber ) {
 
 
         try {
@@ -26,25 +27,22 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribeFun
             Iterator<T> itr = getIterator();
 
 
-            //TODO T.N. when > 0.17 comes out, we need to implement the check with each loop as described here https://github.com/Netflix/RxJava/issues/802
-            while ( itr.hasNext()) {
-                observer.onNext( itr.next() );
+            //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+            while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+                subscriber.onNext( itr.next() );
             }
 
-            observer.onCompleted();
+            subscriber.onCompleted();
         }
 
         //if any error occurs, we need to notify the observer so it can perform it's own error handling
         catch ( Throwable t ) {
-            observer.onError( t );
+            subscriber.onError( t );
         }
 
-        return Subscriptions.empty();
     }
 
 
-
-
     /**
      * Return the iterator to feed data to
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index fc53705..9822cc5 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
 
@@ -29,7 +31,7 @@ import org.mockito.stubbing.Answer;
 
 import rx.Observable;
 import rx.concurrency.Schedulers;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -70,7 +72,7 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        AsyncProcessor asyncProcessor = constructProcessor( queue, null );
+        AsyncProcessor asyncProcessor = constructProcessor( queue );
 
 
         //mock up the queue
@@ -89,6 +91,8 @@ public class AsyncProcessorTest {
 
         final TestListener listener = new TestListener();
 
+
+
         final TestEvent event = new TestEvent();
 
 
@@ -108,9 +112,17 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        final AsyncProcessor asyncProcessor = constructProcessor( queue, listener );
+        final AsyncProcessor asyncProcessor = constructProcessor( queue );
+
+        asyncProcessor.addListener( listener );
+
+
+        final CountDownLatch latch = new CountDownLatch( 2 );
+
+        final TestCompleteListener completeListener = new TestCompleteListener(latch);
+
+        asyncProcessor.addCompleteListener( completeListener );
 
-        final CountDownLatch latch = new CountDownLatch( 1 );
 
         //mock up the ack to allow us to block the test until the async confirm fires
         when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
@@ -132,11 +144,15 @@ public class AsyncProcessorTest {
         final TestEvent firedEvent = listener.events.peek();
 
         assertSame( event, firedEvent );
+
+        final TestEvent completeEvent = completeListener.events.peek();
+
+        assertSame(event, completeEvent);
     }
 
 
-//    @Test( timeout = 5000 )
-        @Test
+    //    @Test( timeout = 5000 )
+    @Test
     public void verifyErrorExecution() throws InterruptedException {
 
         final AsynchronousErrorListener listener = new AsynchronousErrorListener();
@@ -163,7 +179,9 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        final AsyncProcessorImpl asyncProcessor = constructProcessor( queue, listener );
+        final AsyncProcessorImpl asyncProcessor = constructProcessor( queue );
+
+        asyncProcessor.addListener( listener );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
 
@@ -177,7 +195,6 @@ public class AsyncProcessorTest {
                 invoked[1] = true;
                 latch.countDown();
             }
-
         } );
 
         //throw an error if remove is called.  This shouldn't happen
@@ -209,13 +226,56 @@ public class AsyncProcessorTest {
     }
 
 
+    @Test
+    public void verifyTimeout() {
+
+
+        final long timeout = 500;
+        final TestEvent event = new TestEvent();
+
+
+        final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
+            @Override
+            public TestEvent getEvent() {
+                return event;
+            }
+
+
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
+        };
+
+        final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+        when(queue.take( 1, 10000l )).thenReturn( Collections.singletonList(asynchronousMessage ));
+
+        AsyncProcessor<TestEvent> processor = constructProcessor( queue );
+
+
+        Collection<AsynchronousMessage<TestEvent>> timeouts =  processor.getTimeouts( 1, 10000l );
+
+        assertEquals(1, timeouts.size());
+
+        AsynchronousMessage<TestEvent> returned = timeouts.iterator().next();
+
+        assertSame(asynchronousMessage, returned);
+
+
+
+    }
+
+
+
     /**
      * Construct the async processor
      */
-    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
+    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
+
+        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
 
-        AsyncProcessorImpl<T> processor =  new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
-        processor.addListener( listener );
 
         return processor;
     }
@@ -245,12 +305,29 @@ public class AsyncProcessorTest {
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
 
-            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
-               @Override
-               public void call( final TestEvent testEvent ) {
-                   events.push( testEvent );
-               }
-           });
+            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+                @Override
+                public void call( final TestEvent testEvent ) {
+                    events.push( testEvent );
+                }
+            } );
+        }
+    }
+
+
+    public static class TestCompleteListener implements CompleteListener<TestEvent> {
+
+        private final CountDownLatch latch;
+        private final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+        public TestCompleteListener( final CountDownLatch latch ) {this.latch = latch;}
+
+
+        @Override
+        public void onComplete( final AsynchronousMessage<TestEvent> event ) {
+            events.push( event.getEvent() );
+            latch.countDown();
         }
     }
 
@@ -265,15 +342,13 @@ public class AsyncProcessorTest {
 
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
-            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
-                          @Override
-                          public void call( final TestEvent testEvent ) {
-                              events.push( testEvent );
-                              throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
-                          }
-                      });
+            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+                @Override
+                public void call( final TestEvent testEvent ) {
+                    events.push( testEvent );
+                    throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
+                }
+            } );
         }
-
-
     }
 }