You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:17:52 UTC

[02/50] [abbrv] usergrid git commit: fix subscribe for messages

fix subscribe for messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1a1d42e1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1a1d42e1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1a1d42e1

Branch: refs/heads/asf-site
Commit: 1a1d42e1f53cabf433442c17f614f9fcae418a22
Parents: b437f61
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 16:28:04 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 16:28:04 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1a1d42e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e215d48..bf29c5a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -285,13 +285,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
                 //ack after successful completion of the operation.
-                return indexProducer.put( combined )
-                    .flatMap( operationResult -> Observable.from( indexEventResults ) )
-                    //ack each message, but only if we didn't error.  If we did, we'll want to log it and
-                    .map( indexEventResult -> {
-                                        ack( indexEventResult.queueMessage );
-                                        return indexEventResult;
-                                    } );
+                return indexProducer.put(combined)
+                    .flatMap(operationResult -> Observable.from(indexEventResults));
+
             } );
 
     }
@@ -538,7 +534,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         }
                     })
                             //this won't block our read loop, just reads and proceeds
-                            .flatMap( messages -> handleMessages( messages ) ).subscribeOn( Schedulers.newThread() );
+                            .map(messages ->
+                                    handleMessages(messages)
+                                        .map(indexEventResult -> {
+                                            ack( indexEventResult.getQueueMessage() );
+                                            return indexEventResult;
+                                        })
+                                        .toBlocking().lastOrDefault(null)
+                            )//ack each message, but only if we didn't error.  If we did, we'll want to log it and
+                            .subscribeOn( Schedulers.newThread() );
 
             //start in the background