You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/14 18:54:22 UTC

[26/39] usergrid git commit: add missing subscribe

add missing subscribe


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c4c1abf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c4c1abf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c4c1abf

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 0c4c1abf3bcbdb1a62d1ec74d892698e7e211d40
Parents: a9dc6ba
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:13:00 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:13:00 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java          | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c4c1abf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 82e6d19..5f681e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -243,15 +243,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 continue;
             }
 
-
             if (event instanceof EdgeDeleteEvent) {
-               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
             } else if (event instanceof EdgeIndexEvent) {
-               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
             } else if (event instanceof EntityDeleteEvent) {
-                merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+                merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
             } else if (event instanceof EntityIndexEvent) {
-                merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+                merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
             } else if (event instanceof InitializeApplicationIndexEvent) {
                 //does not return observable
                 handleInitializeApplicationIndex(message);
@@ -272,10 +271,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 indexProducer.put(combined).subscribe();
                 return Observable.from(indexEventResults);
             })
-            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
+            .subscribe();
     }
 
-    private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+    private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
         try{
             IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
             return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));