You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/21 22:12:17 UTC

[06/50] usergrid git commit: remove observable

remove observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32d35e7d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32d35e7d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32d35e7d

Branch: refs/heads/jacoco
Commit: 32d35e7d5e605005b3f530e3a1e390d394c29c0a
Parents: f6409ce
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 13 10:42:58 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 13 10:42:58 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 109 ++++++++++---------
 1 file changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/32d35e7d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6f41563..0fef974 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -259,65 +259,66 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("callEventHandlers with {} message", messages.size());
         }
 
-        Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
-            AsyncEvent event = null;
-            try {
-                event = (AsyncEvent) message.getBody();
-            } catch (ClassCastException cce) {
-                logger.error("Failed to deserialize message body", cce);
-            }
-
-            if (event == null) {
-                logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
-            }
-
-            final AsyncEvent thisEvent = event;
-            if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
-            }
-
-            try {
-                Observable<IndexOperationMessage> indexoperationObservable;
-                //merge each operation to a master observable;
-                if (event instanceof EdgeDeleteEvent) {
-                    indexoperationObservable = handleEdgeDelete(message);
-                } else if (event instanceof EdgeIndexEvent) {
-                    indexoperationObservable = handleEdgeIndex(message);
-                } else if (event instanceof EntityDeleteEvent) {
-                    indexoperationObservable = handleEntityDelete(message);
-                } else if (event instanceof EntityIndexEvent) {
-                    indexoperationObservable = handleEntityIndexUpdate(message);
-                } else if (event instanceof InitializeApplicationIndexEvent) {
-                    //does not return observable
-                    handleInitializeApplicationIndex(event, message);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                } else {
-                    throw new Exception("Unknown EventType");//TODO: print json instead
+        List<IndexEventResult> indexEventResults = messages.stream()
+            .map(message -> {
+                AsyncEvent event = null;
+                try {
+                    event = (AsyncEvent) message.getBody();
+                } catch (ClassCastException cce) {
+                    logger.error("Failed to deserialize message body", cce);
                 }
 
-                //collect all of the
-                IndexOperationMessage indexOperationMessage =
-                    indexoperationObservable
-                        .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-                        .toBlocking().lastOrDefault(null);
+                if (event == null) {
+                    logger.error("AsyncEvent type or event is null!");
+                    return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
+                }
 
-                if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
-                    logger.info("Received empty index sequence message:({}), body:({}) ",
-                        message.getMessageId(),message.getStringBody());
+                final AsyncEvent thisEvent = event;
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Processing {} event", event);
                 }
 
-                //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-            } catch (Exception e) {
-                logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
-                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
-            }
-        });
-        //resolve the list and return it.
-        final List<IndexEventResult> indexEventResults = masterObservable
-            .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
-            .toBlocking().lastOrDefault(null);
+                try {
+                    //check for empty sets
+                    boolean validateEmptySets = true;
+                    Observable<IndexOperationMessage> indexoperationObservable;
+                    //merge each operation to a master observable;
+                    if (event instanceof EdgeDeleteEvent) {
+                        indexoperationObservable = handleEdgeDelete(message);
+                    } else if (event instanceof EdgeIndexEvent) {
+                        indexoperationObservable = handleEdgeIndex(message);
+                    } else if (event instanceof EntityDeleteEvent) {
+                        indexoperationObservable = handleEntityDelete(message);
+                    } else if (event instanceof EntityIndexEvent) {
+                        indexoperationObservable = handleEntityIndexUpdate(message);
+                    } else if (event instanceof InitializeApplicationIndexEvent) {
+                        //does not return observable
+                        handleInitializeApplicationIndex(event, message);
+                        indexoperationObservable = Observable.just(new IndexOperationMessage());
+                        validateEmptySets = false;
+                    } else {
+                        throw new Exception("Unknown EventType");//TODO: print json instead
+                    }
+
+                    //collect all of the
+                    IndexOperationMessage indexOperationMessage =
+                        indexoperationObservable
+                            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+                            .toBlocking().lastOrDefault(null);
+
+                    if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+                        logger.error("Received empty index sequence message:({}), body:({}) ",
+                            message.getMessageId(), message.getStringBody());
+                    }
+
+                    //return type that can be indexed and ack'd later
+                    return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+                } catch (Exception e) {
+                    logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+                    return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
+                }
+            })
+            .collect(Collectors.toList());
 
 
         return indexEventResults;