You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/06 22:35:06 UTC

[8/9] usergrid git commit: move subscriber

move subscriber


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7dceb563
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7dceb563
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7dceb563

Branch: refs/heads/2.1-release
Commit: 7dceb563eeb836a7dd09280b07af78c69ebb1793
Parents: 17586ec
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 08:39:21 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 08:39:21 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  5 ++-
 .../core/rx/ExceptionBehaviorTest.java          | 32 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7dceb563/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e16de05..14d37b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -547,12 +547,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                             //ack each message, but only if we didn't error.
                                             ack(message);
                                         })
-                            )
-                            .subscribeOn(Schedulers.newThread());
+                            );
 
             //start in the background
 
-            final Subscription subscription = consumer.subscribe();
+            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
 
             subscriptions.add(subscription);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7dceb563/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
index be10d0a..8e4f4c4 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
@@ -22,6 +22,7 @@ import org.junit.Test;
 
 import rx.Observable;
 import rx.Observer;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -31,13 +32,21 @@ public class ExceptionBehaviorTest {
 
     //this test shows toBlocking re-throws exceptions correctly
     @Test( expected = TestException.class )
-    public void throwOnBlocking() {
+    public void throwOnBlockingFirst() {
 
         Observable.range( 0, 1 ).map( integer -> {
             throw new TestException( "I throw and exception" );
         } ).toBlocking().first();
     }
 
+    @Test( expected = TestException.class )
+    public void throwOnBlockingLast() {
+
+        Observable.range( 0, 1 ).map( integer -> {
+            throw new TestException( "I throw and exception" );
+        } ).toBlocking().last();
+    }
+
 //
 //    /**
 //     * This shows that no re-throw happens on subscribe.  This is as designed, but not as expected
@@ -67,6 +76,27 @@ public class ExceptionBehaviorTest {
         exceptionObserver.checkResult();
     }
 
+    /**
+     *  Tests working with observers
+     */
+    @Test( expected = TestException.class )
+    public void throwOnSubscribeObservableNewThread() throws Exception {
+
+        final ReThrowObserver exceptionObserver = new ReThrowObserver();
+
+        Observable.range( 0, 1 ).map(integer -> {
+            throw new TestException("I throw and exception");
+        })
+            .doOnError(t -> exceptionObserver.onError(t))
+            .subscribeOn(Schedulers.newThread())
+            .subscribe(exceptionObserver);
+
+        for(int i =0; i<5; i++) {
+            exceptionObserver.checkResult();
+            Thread.sleep(200);
+        }
+    }
+
 
     private static final class TestException extends RuntimeException {