You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/14 20:57:48 UTC
usergrid git commit: Update event handling to better handle case when
no index requests are returned from event processing.
Repository: usergrid
Updated Branches:
refs/heads/release-2.1.1 3df07791c -> 7af4f8454
Update event handling to better handle case when no index requests are returned from event processing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7af4f845
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7af4f845
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7af4f845
Branch: refs/heads/release-2.1.1
Commit: 7af4f8454ff9f2270835a2983fa41c390d4602f5
Parents: 3df0779
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Jun 14 13:57:14 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Jun 14 13:57:14 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 31 +++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7af4f845/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index fa175ab..8d050fe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -355,6 +355,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
}
+ if( single.isEmpty() ){
+ logger.warn("No index operation messages came back from event processing for msg {} ",
+ message.getStringBody().trim());
+ }
+
// if no exception happens and the QueueMessage is returned in these results, it will get ack'd
return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
@@ -370,8 +375,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
} catch (Exception e) {
+ // NPEs don't have a detail message, so add something for our log statement to identify better
+ final String errorMessage;
+ if( e instanceof NullPointerException ) {
+ errorMessage = "NullPointerException";
+ }else{
+ errorMessage = e.getMessage();
+ }
+
// if the event fails to process, log and return empty message result so it doesn't get ack'd
- logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() );
+ logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() );
return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
}
});
@@ -427,7 +440,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
- return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+ // default this observable's return to empty index operation message if nothing is emitted
+ return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(new IndexOperationMessage());
}
@@ -453,9 +467,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
+ // default this observable's return to empty index operation message if nothing is emitted
return ecm.load( edgeIndexEvent.getEntityId() )
.flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) )
- .toBlocking().lastOrDefault(null);
+ .toBlocking().lastOrDefault(new IndexOperationMessage());
}
@@ -487,7 +502,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}
- return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
+ // default this observable's return to empty index operation message if nothing is emitted
+ return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(new IndexOperationMessage());
}
@@ -600,9 +616,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion();
+ // default this observable's return to empty index operation message if nothing is emitted
return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion )
- .toBlocking().lastOrDefault(null);
-
+ .toBlocking().lastOrDefault(new IndexOperationMessage());
}
@@ -675,7 +691,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
- return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
+ // default this observable's return to empty index operation message if nothing is emitted
+ return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
}