You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:17:55 UTC

[05/50] [abbrv] usergrid git commit: fix observable class cast exception

fix observable class cast exception


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d6bf2fac
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d6bf2fac
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d6bf2fac

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: d6bf2fac1b5b7ee994d13c7efce007d1f0dc6de2
Parents: 1a1d42e
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 18:36:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 18:36:05 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 37 +++++++++++---------
 1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d6bf2fac/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index bf29c5a..abb76c2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -233,8 +233,12 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
 
         Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
-            final AsyncEvent event = (AsyncEvent) message.getBody();
-
+            AsyncEvent event = null;
+            try{
+                event = (AsyncEvent) message.getBody();
+            }catch (ClassCastException cce){
+                logger.error("Failed to deserialize message body",cce);
+            }
             logger.debug("Processing {} event", event);
 
             if (event == null) {
@@ -259,10 +263,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
                     logger.error("Unknown EventType: {}", event);
                     return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
                 }
-            }catch (Exception e){
-                logger.error("Failed to index entity", e,message);
+            } catch (Exception e) {
+                logger.error("Failed to index entity", e, message);
                 return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
-            }finally {
+            } finally {
                 messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
 
             }
@@ -270,25 +274,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         return masterObservable
             //remove unsuccessful
-            .filter( indexEventResult -> indexEventResult.shouldProcess()  )
+            .filter(indexEventResult -> indexEventResult.shouldProcess())
             //take the max
             .buffer( MAX_TAKE )
             //map them to index results and return them
-            .flatMap( indexEventResults -> {
+            .flatMap(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
                 indexEventResults.stream().forEach(
-                    indexEventResult ->{
-                        if(indexEventResult.getIndexOperationMessage().isPresent()) {
+                    indexEventResult -> {
+                        if (indexEventResult.getIndexOperationMessage().isPresent()) {
                             combined.ingest(indexEventResult.getIndexOperationMessage().get());
                         }
-                    } );
+                    });
 
 
                 //ack after successful completion of the operation.
                 return indexProducer.put(combined)
                     .flatMap(operationResult -> Observable.from(indexEventResults));
 
-            } );
+            });
 
     }
 
@@ -376,8 +380,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
-        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
-            applicationScope, entity, edge ) );
+        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
+            applicationScope, entity, edge));
         return edgeIndexObservable;
     }
 
@@ -442,8 +446,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
 
 
-        final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
-            entityDeleteResults.getIndexObservable() );
+        final Observable<IndexOperationMessage> merged = entityDeleteResults.getEntitiesCompacted().flatMap(mvccLogEntries -> entityDeleteResults.getIndexObservable()) ;
         return merged;
     }
 
@@ -537,12 +540,12 @@ public class AmazonAsyncEventService implements AsyncEventService {
                             .map(messages ->
                                     handleMessages(messages)
                                         .map(indexEventResult -> {
-                                            ack( indexEventResult.getQueueMessage() );
+                                            ack(indexEventResult.getQueueMessage());
                                             return indexEventResult;
                                         })
                                         .toBlocking().lastOrDefault(null)
                             )//ack each message, but only if we didn't error.  If we did, we'll want to log it and
-                            .subscribeOn( Schedulers.newThread() );
+                            .subscribeOn(Schedulers.newThread());
 
             //start in the background