You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/09 16:54:40 UTC

[1/2] usergrid git commit: add better exception handling

Repository: usergrid
Updated Branches:
  refs/heads/2.1-release b48551722 -> 80324de9e


add better exception handling


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3cb0a0e0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3cb0a0e0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3cb0a0e0

Branch: refs/heads/2.1-release
Commit: 3cb0a0e0c5a8a1f1e06d3bd71b9a2bcef333ca80
Parents: b485517
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 08:39:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 08:39:27 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3cb0a0e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0ac3860..b384b80 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -307,7 +307,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
             }
         });
         //resolve the list and return it.
-        final List<IndexEventResult> indexEventResults = masterObservable.toList().toBlocking().lastOrDefault(null);
+        final List<IndexEventResult> indexEventResults = masterObservable
+            .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
+            .toBlocking().lastOrDefault(null);
         //if nothing came back then return null
         if(indexEventResults==null){
             return null;
@@ -318,13 +320,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
         //stream and filer the messages
         List<QueueMessage> messagesToAck = indexEventResults.stream()
             .map(indexEventResult -> {
+                //collect into the index submission
                 if (indexEventResult.getIndexOperationMessage().isPresent()) {
                     combined.ingest(indexEventResult.getIndexOperationMessage().get());
                 }
                 return indexEventResult;
             })
+            //filter out the ones that need to be ack'd
             .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
             .map(indexEventResult -> {
+                //record the cycle time
                 messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
                 return indexEventResult;
             })
@@ -333,8 +338,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
             .collect(Collectors.toList());
 
         //send the batch
-        indexProducer.put(combined).toBlocking().lastOrDefault(null);
-
+        //TODO: should retry?
+        try {
+            indexProducer.put(combined).toBlocking().lastOrDefault(null);
+        }catch (Exception e){
+            logger.error("Failed to submit to index producer",messages,e);
+            throw e;
+        }
         return messagesToAck;
     }
 
@@ -584,10 +594,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     }
                                     //ack each message, but only if we didn't error.
                                     ack(messagesToAck);
-                                    //messagesToAck.stream().forEach(message -> ack(message));
                                     return messagesToAck;
                                 } catch (Exception e) {
-                                    logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);
+                                    logger.error("failed to ack messages to sqs", messages, e);
                                     return null;
                                     //do not rethrow so we can process all of them
                                 }


[2/2] usergrid git commit: seperate concerns so indexing and event handling are distinct

Posted by sf...@apache.org.
seperate concerns so indexing and event handling are distinct


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/80324de9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/80324de9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/80324de9

Branch: refs/heads/2.1-release
Commit: 80324de9e606ef558f129620a242ce4b0a3a25f9
Parents: 3cb0a0e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 08:47:25 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 08:47:25 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 95 +++++++++++---------
 1 file changed, 54 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/80324de9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b384b80..4ee2094 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -23,7 +23,6 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -73,7 +72,6 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 
@@ -251,9 +249,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
+    /**
+     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
+     * @param messages
+     * @return
+     */
+    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
         if (logger.isDebugEnabled()) {
-            logger.debug("handleMessages with {} message", messages.size());
+            logger.debug("callEventHandlers with {} message", messages.size());
         }
 
         Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
@@ -310,42 +313,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final List<IndexEventResult> indexEventResults = masterObservable
             .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
             .toBlocking().lastOrDefault(null);
-        //if nothing came back then return null
-        if(indexEventResults==null){
-            return null;
-        }
 
-        final IndexOperationMessage combined = new IndexOperationMessage();
 
-        //stream and filer the messages
-        List<QueueMessage> messagesToAck = indexEventResults.stream()
-            .map(indexEventResult -> {
-                //collect into the index submission
-                if (indexEventResult.getIndexOperationMessage().isPresent()) {
-                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
-                }
-                return indexEventResult;
-            })
-            //filter out the ones that need to be ack'd
-            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
-            .map(indexEventResult -> {
-                //record the cycle time
-                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
-                return indexEventResult;
-            })
-                //ack after successful completion of the operation.
-            .map(result -> result.getQueueMessage().get())
-            .collect(Collectors.toList());
-
-        //send the batch
-        //TODO: should retry?
-        try {
-            indexProducer.put(combined).toBlocking().lastOrDefault(null);
-        }catch (Exception e){
-            logger.error("Failed to submit to index producer",messages,e);
-            throw e;
-        }
-        return messagesToAck;
+        return indexEventResults;
     }
 
 
@@ -582,9 +552,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                 }
 
                                 try {
-
-                                    List<QueueMessage> messagesToAck = handleMessages(messages);
-
+                                    List<IndexEventResult> indexEventResults = callEventHandlers(messages);
+                                    List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
                                     if (messagesToAck == null || messagesToAck.size() == 0) {
                                         logger.error("No messages came back from the queue operation",messages);
                                         return messagesToAck;
@@ -610,6 +579,50 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
+    /**
+     * Submit results to index and return the queue messages to be ack'd
+     * @param indexEventResults
+     * @return
+     */
+    private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) {
+        //if nothing came back then return null
+        if(indexEventResults==null){
+            return null;
+        }
+
+        final IndexOperationMessage combined = new IndexOperationMessage();
+
+        //stream and filer the messages
+        List<QueueMessage> messagesToAck = indexEventResults.stream()
+            .map(indexEventResult -> {
+                //collect into the index submission
+                if (indexEventResult.getIndexOperationMessage().isPresent()) {
+                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                }
+                return indexEventResult;
+            })
+                //filter out the ones that need to be ack'd
+            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+            .map(indexEventResult -> {
+                //record the cycle time
+                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                return indexEventResult;
+            })
+                //ack after successful completion of the operation.
+            .map(result -> result.getQueueMessage().get())
+            .collect(Collectors.toList());
+
+        //send the batch
+        //TODO: should retry?
+        try {
+            indexProducer.put(combined).toBlocking().lastOrDefault(null);
+        }catch (Exception e){
+            logger.error("Failed to submit to index producer",e);
+            throw e;
+        }
+        return messagesToAck;
+    }
+
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
         //change to id scope to avoid serialization issues
         offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince ) );