You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:17:55 UTC
[05/50] [abbrv] usergrid git commit: fix observable class cast
exception
fix observable class cast exception
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d6bf2fac
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d6bf2fac
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d6bf2fac
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: d6bf2fac1b5b7ee994d13c7efce007d1f0dc6de2
Parents: 1a1d42e
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 18:36:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 18:36:05 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 37 +++++++++++---------
1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d6bf2fac/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index bf29c5a..abb76c2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -233,8 +233,12 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
- final AsyncEvent event = (AsyncEvent) message.getBody();
-
+ AsyncEvent event = null;
+ try{
+ event = (AsyncEvent) message.getBody();
+ }catch (ClassCastException cce){
+ logger.error("Failed to deserialize message body",cce);
+ }
logger.debug("Processing {} event", event);
if (event == null) {
@@ -259,10 +263,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.error("Unknown EventType: {}", event);
return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
}
- }catch (Exception e){
- logger.error("Failed to index entity", e,message);
+ } catch (Exception e) {
+ logger.error("Failed to index entity", e, message);
return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
- }finally {
+ } finally {
messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
}
@@ -270,25 +274,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
return masterObservable
//remove unsuccessful
- .filter( indexEventResult -> indexEventResult.shouldProcess() )
+ .filter(indexEventResult -> indexEventResult.shouldProcess())
//take the max
.buffer( MAX_TAKE )
//map them to index results and return them
- .flatMap( indexEventResults -> {
+ .flatMap(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
indexEventResults.stream().forEach(
- indexEventResult ->{
- if(indexEventResult.getIndexOperationMessage().isPresent()) {
+ indexEventResult -> {
+ if (indexEventResult.getIndexOperationMessage().isPresent()) {
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
- } );
+ });
//ack after successful completion of the operation.
return indexProducer.put(combined)
.flatMap(operationResult -> Observable.from(indexEventResults));
- } );
+ });
}
@@ -376,8 +380,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
- applicationScope, entity, edge ) );
+ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
+ applicationScope, entity, edge));
return edgeIndexObservable;
}
@@ -442,8 +446,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
- final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
- entityDeleteResults.getIndexObservable() );
+ final Observable<IndexOperationMessage> merged = entityDeleteResults.getEntitiesCompacted().flatMap(mvccLogEntries -> entityDeleteResults.getIndexObservable()) ;
return merged;
}
@@ -537,12 +540,12 @@ public class AmazonAsyncEventService implements AsyncEventService {
.map(messages ->
handleMessages(messages)
.map(indexEventResult -> {
- ack( indexEventResult.getQueueMessage() );
+ ack(indexEventResult.getQueueMessage());
return indexEventResult;
})
.toBlocking().lastOrDefault(null)
)//ack each message, but only if we didn't error. If we did, we'll want to log it and
- .subscribeOn( Schedulers.newThread() );
+ .subscribeOn(Schedulers.newThread());
//start in the background