You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:28:01 UTC
[26/36] usergrid git commit: add missing subscribe
add missing subscribe
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c4c1abf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c4c1abf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c4c1abf
Branch: refs/heads/master
Commit: 0c4c1abf3bcbdb1a62d1ec74d892698e7e211d40
Parents: a9dc6ba
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:13:00 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:13:00 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c4c1abf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 82e6d19..5f681e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -243,15 +243,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
continue;
}
-
if (event instanceof EdgeDeleteEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
} else if (event instanceof EdgeIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
} else if (event instanceof EntityDeleteEvent) {
- merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+ merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
} else if (event instanceof EntityIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
handleInitializeApplicationIndex(message);
@@ -272,10 +271,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexProducer.put(combined).subscribe();
return Observable.from(indexEventResults);
})
- .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+ .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
+ .subscribe();
}
- private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+ private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
try{
IndexOperationMessage indexOperationMessage = toCall.call(message).toBlocking().lastOrDefault(null);
return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));