You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/09 16:54:40 UTC
[1/2] usergrid git commit: add better exception handling
Repository: usergrid
Updated Branches:
refs/heads/2.1-release b48551722 -> 80324de9e
add better exception handling
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3cb0a0e0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3cb0a0e0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3cb0a0e0
Branch: refs/heads/2.1-release
Commit: 3cb0a0e0c5a8a1f1e06d3bd71b9a2bcef333ca80
Parents: b485517
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 08:39:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 08:39:27 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3cb0a0e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0ac3860..b384b80 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -307,7 +307,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
});
//resolve the list and return it.
- final List<IndexEventResult> indexEventResults = masterObservable.toList().toBlocking().lastOrDefault(null);
+ final List<IndexEventResult> indexEventResults = masterObservable
+ .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
+ .toBlocking().lastOrDefault(null);
//if nothing came back then return null
if(indexEventResults==null){
return null;
@@ -318,13 +320,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
//stream and filer the messages
List<QueueMessage> messagesToAck = indexEventResults.stream()
.map(indexEventResult -> {
+ //collect into the index submission
if (indexEventResult.getIndexOperationMessage().isPresent()) {
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
return indexEventResult;
})
+ //filter out the ones that need to be ack'd
.filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
.map(indexEventResult -> {
+ //record the cycle time
messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
return indexEventResult;
})
@@ -333,8 +338,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
.collect(Collectors.toList());
//send the batch
- indexProducer.put(combined).toBlocking().lastOrDefault(null);
-
+ //TODO: should retry?
+ try {
+ indexProducer.put(combined).toBlocking().lastOrDefault(null);
+ }catch (Exception e){
+ logger.error("Failed to submit to index producer",messages,e);
+ throw e;
+ }
return messagesToAck;
}
@@ -584,10 +594,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
//ack each message, but only if we didn't error.
ack(messagesToAck);
- //messagesToAck.stream().forEach(message -> ack(message));
return messagesToAck;
} catch (Exception e) {
- logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);
+ logger.error("failed to ack messages to sqs", messages, e);
return null;
//do not rethrow so we can process all of them
}
[2/2] usergrid git commit: seperate concerns so indexing and event
handling are distinct
Posted by sf...@apache.org.
seperate concerns so indexing and event handling are distinct
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/80324de9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/80324de9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/80324de9
Branch: refs/heads/2.1-release
Commit: 80324de9e606ef558f129620a242ce4b0a3a25f9
Parents: 3cb0a0e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 08:47:25 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 08:47:25 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 95 +++++++++++---------
1 file changed, 54 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/80324de9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b384b80..4ee2094 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -23,7 +23,6 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -73,7 +72,6 @@ import com.google.inject.Singleton;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -251,9 +249,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
+ /**
+ * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
+ * @param messages
+ * @return
+ */
+ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
if (logger.isDebugEnabled()) {
- logger.debug("handleMessages with {} message", messages.size());
+ logger.debug("callEventHandlers with {} message", messages.size());
}
Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
@@ -310,42 +313,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
final List<IndexEventResult> indexEventResults = masterObservable
.collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
.toBlocking().lastOrDefault(null);
- //if nothing came back then return null
- if(indexEventResults==null){
- return null;
- }
- final IndexOperationMessage combined = new IndexOperationMessage();
- //stream and filer the messages
- List<QueueMessage> messagesToAck = indexEventResults.stream()
- .map(indexEventResult -> {
- //collect into the index submission
- if (indexEventResult.getIndexOperationMessage().isPresent()) {
- combined.ingest(indexEventResult.getIndexOperationMessage().get());
- }
- return indexEventResult;
- })
- //filter out the ones that need to be ack'd
- .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
- .map(indexEventResult -> {
- //record the cycle time
- messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
- return indexEventResult;
- })
- //ack after successful completion of the operation.
- .map(result -> result.getQueueMessage().get())
- .collect(Collectors.toList());
-
- //send the batch
- //TODO: should retry?
- try {
- indexProducer.put(combined).toBlocking().lastOrDefault(null);
- }catch (Exception e){
- logger.error("Failed to submit to index producer",messages,e);
- throw e;
- }
- return messagesToAck;
+ return indexEventResults;
}
@@ -582,9 +552,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
try {
-
- List<QueueMessage> messagesToAck = handleMessages(messages);
-
+ List<IndexEventResult> indexEventResults = callEventHandlers(messages);
+ List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
if (messagesToAck == null || messagesToAck.size() == 0) {
logger.error("No messages came back from the queue operation",messages);
return messagesToAck;
@@ -610,6 +579,50 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
}
+ /**
+ * Submit results to index and return the queue messages to be ack'd
+ * @param indexEventResults
+ * @return
+ */
+ private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) {
+ //if nothing came back then return null
+ if(indexEventResults==null){
+ return null;
+ }
+
+ final IndexOperationMessage combined = new IndexOperationMessage();
+
+ //stream and filer the messages
+ List<QueueMessage> messagesToAck = indexEventResults.stream()
+ .map(indexEventResult -> {
+ //collect into the index submission
+ if (indexEventResult.getIndexOperationMessage().isPresent()) {
+ combined.ingest(indexEventResult.getIndexOperationMessage().get());
+ }
+ return indexEventResult;
+ })
+ //filter out the ones that need to be ack'd
+ .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+ .map(indexEventResult -> {
+ //record the cycle time
+ messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+ return indexEventResult;
+ })
+ //ack after successful completion of the operation.
+ .map(result -> result.getQueueMessage().get())
+ .collect(Collectors.toList());
+
+ //send the batch
+ //TODO: should retry?
+ try {
+ indexProducer.put(combined).toBlocking().lastOrDefault(null);
+ }catch (Exception e){
+ logger.error("Failed to submit to index producer",e);
+ throw e;
+ }
+ return messagesToAck;
+ }
+
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
//change to id scope to avoid serialization issues
offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince ) );