You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:32 UTC

[42/50] [abbrv] usergrid git commit: remove observable

remove observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dbf37e48
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dbf37e48
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dbf37e48

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: dbf37e48caee2ca7fb5a2468f4800ad4bdf196ea
Parents: 9cbe283
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 13 11:14:21 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 13 11:14:21 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 114 +++++++++----------
 1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbf37e48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f1a02ce..95126c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
@@ -248,7 +249,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
-
     /**
      * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
      * @param messages
@@ -259,71 +259,71 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("callEventHandlers with {} message", messages.size());
         }
 
-        Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
-            AsyncEvent event = null;
-            try {
-                event = (AsyncEvent) message.getBody();
-            } catch (ClassCastException cce) {
-                logger.error("Failed to deserialize message body", cce);
-            }
-
-            if (event == null) {
-                logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
-            }
-
-            final AsyncEvent thisEvent = event;
-            if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
-            }
-
-            try {
-                Observable<IndexOperationMessage> indexoperationObservable;
-                //merge each operation to a master observable;
-                if (event instanceof EdgeDeleteEvent) {
-                    indexoperationObservable = handleEdgeDelete(message);
-                } else if (event instanceof EdgeIndexEvent) {
-                    indexoperationObservable = handleEdgeIndex(message);
-                } else if (event instanceof EntityDeleteEvent) {
-                    indexoperationObservable = handleEntityDelete(message);
-                } else if (event instanceof EntityIndexEvent) {
-                    indexoperationObservable = handleEntityIndexUpdate(message);
-                } else if (event instanceof InitializeApplicationIndexEvent) {
-                    //does not return observable
-                    handleInitializeApplicationIndex(event, message);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                } else {
-                    throw new Exception("Unknown EventType");//TODO: print json instead
+        Stream<IndexEventResult> indexEventResults = messages.stream()
+            .map(message -> {
+                AsyncEvent event = null;
+                try {
+                    event = (AsyncEvent) message.getBody();
+                } catch (ClassCastException cce) {
+                    logger.error("Failed to deserialize message body", cce);
                 }
 
-                //collect all of the
-                IndexOperationMessage indexOperationMessage =
-                    indexoperationObservable
-                        .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-                        .toBlocking().lastOrDefault(null);
+                if (event == null) {
+                    logger.error("AsyncEvent type or event is null!");
+                    return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
+                }
 
-                if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
-                    logger.info("Received empty index sequence message:({}), body:({}) ",
-                        message.getMessageId(),message.getStringBody());
+                final AsyncEvent thisEvent = event;
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Processing {} event", event);
                 }
 
-                //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-            } catch (Exception e) {
-                logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
-                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
-            }
-        });
-        //resolve the list and return it.
-        final List<IndexEventResult> indexEventResults = masterObservable
-            .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
-            .toBlocking().lastOrDefault(null);
+                try {
+                    //check for empty sets if this is true
+                    boolean validateEmptySets = true;
+                    Observable<IndexOperationMessage> indexoperationObservable;
+                    //merge each operation to a master observable;
+                    if (event instanceof EdgeDeleteEvent) {
+                        indexoperationObservable = handleEdgeDelete(message);
+                    } else if (event instanceof EdgeIndexEvent) {
+                        indexoperationObservable = handleEdgeIndex(message);
+                    } else if (event instanceof EntityDeleteEvent) {
+                        indexoperationObservable = handleEntityDelete(message);
+                    } else if (event instanceof EntityIndexEvent) {
+                        indexoperationObservable = handleEntityIndexUpdate(message);
+                    } else if (event instanceof InitializeApplicationIndexEvent) {
+                        //does not return observable
+                        handleInitializeApplicationIndex(event, message);
+                        indexoperationObservable = Observable.just(new IndexOperationMessage());
+                        validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+                    } else {
+                        throw new Exception("Unknown EventType");//TODO: print json instead
+                    }
+
+                    //collect all of the
+                    IndexOperationMessage indexOperationMessage =
+                        indexoperationObservable
+                            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+                            .toBlocking().lastOrDefault(null);
+
+                    if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+                        logger.error("Received empty index sequence message:({}), body:({}) ",
+                            message.getMessageId(), message.getStringBody());
+                        throw new Exception("Received empty index sequence.");
+                    }
+
+                    //return type that can be indexed and ack'd later
+                    return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+                } catch (Exception e) {
+                    logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+                    return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
+                }
+            });
 
 
-        return indexEventResults;
+        return indexEventResults.collect(Collectors.toList());
     }
 
-
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(