You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/06 16:40:26 UTC
usergrid git commit: move subscriber
Repository: usergrid
Updated Branches:
refs/heads/review-observable 17586ecb5 -> 7dceb563e
move subscriber
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7dceb563
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7dceb563
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7dceb563
Branch: refs/heads/review-observable
Commit: 7dceb563eeb836a7dd09280b07af78c69ebb1793
Parents: 17586ec
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 08:39:21 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 08:39:21 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 5 ++-
.../core/rx/ExceptionBehaviorTest.java | 32 +++++++++++++++++++-
2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7dceb563/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e16de05..14d37b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -547,12 +547,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
//ack each message, but only if we didn't error.
ack(message);
})
- )
- .subscribeOn(Schedulers.newThread());
+ );
//start in the background
- final Subscription subscription = consumer.subscribe();
+ final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
subscriptions.add(subscription);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7dceb563/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
index be10d0a..8e4f4c4 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
@@ -22,6 +22,7 @@ import org.junit.Test;
import rx.Observable;
import rx.Observer;
+import rx.schedulers.Schedulers;
/**
@@ -31,13 +32,21 @@ public class ExceptionBehaviorTest {
//this test shows toBlocking re-throws exceptions correctly
@Test( expected = TestException.class )
- public void throwOnBlocking() {
+ public void throwOnBlockingFirst() {
Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).toBlocking().first();
}
+ @Test( expected = TestException.class )
+ public void throwOnBlockingLast() {
+
+ Observable.range( 0, 1 ).map( integer -> {
+ throw new TestException( "I throw and exception" );
+ } ).toBlocking().last();
+ }
+
//
// /**
// * This shows that no re-throw happens on subscribe. This is as designed, but not as expected
@@ -67,6 +76,27 @@ public class ExceptionBehaviorTest {
exceptionObserver.checkResult();
}
+ /**
+ * Tests working with observers
+ */
+ @Test( expected = TestException.class )
+ public void throwOnSubscribeObservableNewThread() throws Exception {
+
+ final ReThrowObserver exceptionObserver = new ReThrowObserver();
+
+ Observable.range( 0, 1 ).map(integer -> {
+ throw new TestException("I throw and exception");
+ })
+ .doOnError(t -> exceptionObserver.onError(t))
+ .subscribeOn(Schedulers.newThread())
+ .subscribe(exceptionObserver);
+
+ for(int i =0; i<5; i++) {
+ exceptionObserver.checkResult();
+ Thread.sleep(200);
+ }
+ }
+
private static final class TestException extends RuntimeException {