You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/06 22:35:02 UTC
[4/9] usergrid git commit: fix subscribe for messages
fix subscribe for messages
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1a1d42e1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1a1d42e1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1a1d42e1
Branch: refs/heads/2.1-release
Commit: 1a1d42e1f53cabf433442c17f614f9fcae418a22
Parents: b437f61
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 16:28:04 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 16:28:04 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1a1d42e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e215d48..bf29c5a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -285,13 +285,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
//ack after successful completion of the operation.
- return indexProducer.put( combined )
- .flatMap( operationResult -> Observable.from( indexEventResults ) )
- //ack each message, but only if we didn't error. If we did, we'll want to log it and
- .map( indexEventResult -> {
- ack( indexEventResult.queueMessage );
- return indexEventResult;
- } );
+ return indexProducer.put(combined)
+ .flatMap(operationResult -> Observable.from(indexEventResults));
+
} );
}
@@ -538,7 +534,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
})
//this won't block our read loop, just reads and proceeds
- .flatMap( messages -> handleMessages( messages ) ).subscribeOn( Schedulers.newThread() );
+ .map(messages ->
+ handleMessages(messages)
+ .map(indexEventResult -> {
+ ack( indexEventResult.getQueueMessage() );
+ return indexEventResult;
+ })
+ .toBlocking().lastOrDefault(null)
+ )//ack each message, but only if we didn't error. If we did, we'll want to log it and
+ .subscribeOn( Schedulers.newThread() );
//start in the background