You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/13 18:43:06 UTC
usergrid git commit: remove observable
Repository: usergrid
Updated Branches:
refs/heads/remove-inmemory-event-service f6409ce35 -> 32d35e7d5
remove observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32d35e7d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32d35e7d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32d35e7d
Branch: refs/heads/remove-inmemory-event-service
Commit: 32d35e7d5e605005b3f530e3a1e390d394c29c0a
Parents: f6409ce
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 13 10:42:58 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 13 10:42:58 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 109 ++++++++++---------
1 file changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32d35e7d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6f41563..0fef974 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -259,65 +259,66 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("callEventHandlers with {} message", messages.size());
}
- Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
- AsyncEvent event = null;
- try {
- event = (AsyncEvent) message.getBody();
- } catch (ClassCastException cce) {
- logger.error("Failed to deserialize message body", cce);
- }
-
- if (event == null) {
- logger.error("AsyncEvent type or event is null!");
- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
- }
-
- final AsyncEvent thisEvent = event;
- if (logger.isDebugEnabled()) {
- logger.debug("Processing {} event", event);
- }
-
- try {
- Observable<IndexOperationMessage> indexoperationObservable;
- //merge each operation to a master observable;
- if (event instanceof EdgeDeleteEvent) {
- indexoperationObservable = handleEdgeDelete(message);
- } else if (event instanceof EdgeIndexEvent) {
- indexoperationObservable = handleEdgeIndex(message);
- } else if (event instanceof EntityDeleteEvent) {
- indexoperationObservable = handleEntityDelete(message);
- } else if (event instanceof EntityIndexEvent) {
- indexoperationObservable = handleEntityIndexUpdate(message);
- } else if (event instanceof InitializeApplicationIndexEvent) {
- //does not return observable
- handleInitializeApplicationIndex(event, message);
- indexoperationObservable = Observable.just(new IndexOperationMessage());
- } else {
- throw new Exception("Unknown EventType");//TODO: print json instead
+ List<IndexEventResult> indexEventResults = messages.stream()
+ .map(message -> {
+ AsyncEvent event = null;
+ try {
+ event = (AsyncEvent) message.getBody();
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body", cce);
}
- //collect all of the
- IndexOperationMessage indexOperationMessage =
- indexoperationObservable
- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
- .toBlocking().lastOrDefault(null);
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
+ return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
+ }
- if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
- logger.info("Received empty index sequence message:({}), body:({}) ",
- message.getMessageId(),message.getStringBody());
+ final AsyncEvent thisEvent = event;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
}
- //return type that can be indexed and ack'd later
- return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
- } catch (Exception e) {
- logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
- }
- });
- //resolve the list and return it.
- final List<IndexEventResult> indexEventResults = masterObservable
- .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
- .toBlocking().lastOrDefault(null);
+ try {
+ //check for empty sets
+ boolean validateEmptySets = true;
+ Observable<IndexOperationMessage> indexoperationObservable;
+ //merge each operation to a master observable;
+ if (event instanceof EdgeDeleteEvent) {
+ indexoperationObservable = handleEdgeDelete(message);
+ } else if (event instanceof EdgeIndexEvent) {
+ indexoperationObservable = handleEdgeIndex(message);
+ } else if (event instanceof EntityDeleteEvent) {
+ indexoperationObservable = handleEntityDelete(message);
+ } else if (event instanceof EntityIndexEvent) {
+ indexoperationObservable = handleEntityIndexUpdate(message);
+ } else if (event instanceof InitializeApplicationIndexEvent) {
+ //does not return observable
+ handleInitializeApplicationIndex(event, message);
+ indexoperationObservable = Observable.just(new IndexOperationMessage());
+ validateEmptySets = false;
+ } else {
+ throw new Exception("Unknown EventType");//TODO: print json instead
+ }
+
+ //collect all of the
+ IndexOperationMessage indexOperationMessage =
+ indexoperationObservable
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .toBlocking().lastOrDefault(null);
+
+ if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+ logger.error("Received empty index sequence message:({}), body:({}) ",
+ message.getMessageId(), message.getStringBody());
+ }
+
+ //return type that can be indexed and ack'd later
+ return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+ } catch (Exception e) {
+ logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+ return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
+ }
+ })
+ .collect(Collectors.toList());
return indexEventResults;