You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:17 UTC
[27/50] [abbrv] usergrid git commit: add additional logging
add additional logging
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/365f6dc2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/365f6dc2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/365f6dc2
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 365f6dc2ac36720b221d043ef0c28503c2a9513c
Parents: d57b4fe
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 8 14:39:15 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 8 14:39:15 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 52 +++++++++++---------
1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/365f6dc2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 957ee68..e3aca06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -250,7 +250,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- private Observable<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
+ private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
if (logger.isDebugEnabled()) {
logger.debug("handleMessages with {} message", messages.size());
}
@@ -305,9 +305,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
});
//filter for success, send to the index(optional), ack
- return masterObservable
+ return (List<QueueMessage>) masterObservable
//take the max
- .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize)
+ .buffer(bufferSize)
//map them to index results and return them
.flatMap(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
@@ -330,7 +330,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
//return the queue messages to ack
.map(result -> result.getQueueMessage().get());
- });
+ })
+ .doOnError(t -> logger.error("Failed to process queuemessages",t))
+ .toBlocking().lastOrDefault(null);
}
@@ -520,7 +522,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private void startWorker() {
synchronized (mutex) {
- Observable<QueueMessage> consumer =
+ Observable<List<QueueMessage>> consumer =
Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
@Override
public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
@@ -561,26 +563,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
})
//this won't block our read loop, just reads and proceeds
- .flatMap(messages ->
- {
- final int bufferSize = messages.size();
- return handleMessages(messages)
- .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
- .doOnNext(messagesToAck -> {
- if (messagesToAck.size() == 0) {
- return;
- }
- try {
- //ack each message, but only if we didn't error.
- ack(messagesToAck);
- } catch (Exception e) {
- logger.error("failed to ack messages to sqs", messagesToAck.get(0).getMessageId(), e);
- //do not rethrow so we can process all of them
- }
- })
- .flatMap(messagesToAck -> Observable.from(messagesToAck));
+ .map(messages ->
+ {
+ if (messages == null || messages.size() == 0) {
+ return null;
}
- );
+
+ try {
+
+ List<QueueMessage> messagesToAck = handleMessages(messages);
+
+ if (messagesToAck == null || messagesToAck.size() == 0) {
+ return messagesToAck;
+ }
+ //ack each message, but only if we didn't error.
+ ack(messagesToAck);
+ return messagesToAck;
+ } catch (Exception e) {
+ logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ });
//start in the background