You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:17 UTC

[27/50] [abbrv] usergrid git commit: add additional logging

add additional logging


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/365f6dc2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/365f6dc2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/365f6dc2

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 365f6dc2ac36720b221d043ef0c28503c2a9513c
Parents: d57b4fe
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 8 14:39:15 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 8 14:39:15 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 52 +++++++++++---------
 1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/365f6dc2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 957ee68..e3aca06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -250,7 +250,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    private Observable<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
+    private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
         if (logger.isDebugEnabled()) {
             logger.debug("handleMessages with {} message", messages.size());
         }
@@ -305,9 +305,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
         });
 
         //filter for success, send to the index(optional), ack
-        return masterObservable
+        return (List<QueueMessage>) masterObservable
             //take the max
-            .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize)
+            .buffer(bufferSize)
             //map them to index results and return them
             .flatMap(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
@@ -330,7 +330,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
                     //return the queue messages to ack
                     .map(result -> result.getQueueMessage().get());
 
-            });
+            })
+            .doOnError(t -> logger.error("Failed to process queuemessages",t))
+            .toBlocking().lastOrDefault(null);
 
     }
 
@@ -520,7 +522,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void startWorker() {
         synchronized (mutex) {
 
-            Observable<QueueMessage> consumer =
+            Observable<List<QueueMessage>> consumer =
                     Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
                         @Override
                         public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
@@ -561,26 +563,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         }
                     })
                             //this won't block our read loop, just reads and proceeds
-                            .flatMap(messages ->
-                                {
-                                    final int bufferSize = messages.size();
-                                    return handleMessages(messages)
-                                        .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
-                                        .doOnNext(messagesToAck -> {
-                                            if (messagesToAck.size() == 0) {
-                                                return;
-                                            }
-                                            try {
-                                                //ack each message, but only if we didn't error.
-                                                ack(messagesToAck);
-                                            } catch (Exception e) {
-                                                logger.error("failed to ack messages to sqs", messagesToAck.get(0).getMessageId(), e);
-                                                //do not rethrow so we can process all of them
-                                            }
-                                        })
-                                        .flatMap(messagesToAck -> Observable.from(messagesToAck));
+                            .map(messages ->
+                            {
+                                if (messages == null || messages.size() == 0) {
+                                    return null;
                                 }
-                            );
+
+                                try {
+
+                                    List<QueueMessage> messagesToAck = handleMessages(messages);
+
+                                    if (messagesToAck == null || messagesToAck.size() == 0) {
+                                        return messagesToAck;
+                                    }
+                                    //ack each message, but only if we didn't error.
+                                    ack(messagesToAck);
+                                    return messagesToAck;
+                                } catch (Exception e) {
+                                    logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);
+                                    return null;
+                                    //do not rethrow so we can process all of them
+                                }
+                            });
 
             //start in the background