You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/14 20:57:48 UTC

usergrid git commit: Update event handling to better handle case when no index requests are returned from event processing.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 3df07791c -> 7af4f8454


Update event handling to better handle case when no index requests are returned from event processing.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7af4f845
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7af4f845
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7af4f845

Branch: refs/heads/release-2.1.1
Commit: 7af4f8454ff9f2270835a2983fa41c390d4602f5
Parents: 3df0779
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Jun 14 13:57:14 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Jun 14 13:57:14 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7af4f845/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index fa175ab..8d050fe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -355,6 +355,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                     throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
                 }
 
+                if( single.isEmpty() ){
+                    logger.warn("No index operation messages came back from event processing for msg {} ",
+                        message.getStringBody().trim());
+                }
+
 
                 // if no exception happens and the QueueMessage is returned in these results, it will get ack'd
                 return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
@@ -370,8 +375,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
             } catch (Exception e) {
 
+                // NPEs don't have a detail message, so add something for our log statement to identify better
+                final String errorMessage;
+                if( e instanceof NullPointerException ) {
+                    errorMessage = "NullPointerException";
+                }else{
+                    errorMessage = e.getMessage();
+                }
+
                 // if the event fails to process, log and return empty message result so it doesn't get ack'd
-                logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() );
+                logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() );
                 return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
             }
         });
@@ -427,7 +440,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
 
-        return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+        // default this observable's return to empty index operation message if nothing is emitted
+        return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(new IndexOperationMessage());
     }
 
 
@@ -453,9 +467,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
 
+        // default this observable's return to empty index operation message if nothing is emitted
         return ecm.load( edgeIndexEvent.getEntityId() )
             .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) )
-            .toBlocking().lastOrDefault(null);
+            .toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }
 
@@ -487,7 +502,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
-        return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
+        // default this observable's return to empty index operation message if nothing is emitted
+        return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }
 
@@ -600,9 +616,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
         final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion();
 
+        // default this observable's return to empty index operation message if nothing is emitted
         return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion )
-            .toBlocking().lastOrDefault(null);
-
+            .toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }
 
@@ -675,7 +691,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
 
-        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
+        // default this observable's return to empty index operation message if nothing is emitted
+        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }