You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:05 UTC
[15/50] [abbrv] usergrid git commit: cleanup observable,
add realistic timer, make sure messages that error are not ack'd
cleanup observable, add realistic timer, make sure messages that error are not ack'd
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9da0179f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9da0179f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9da0179f
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 9da0179f17d636d44ee8c4121c909632582aafe7
Parents: 7c5a864
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 18:29:56 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 18:29:56 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 67 ++++++++++----------
1 file changed, 33 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9da0179f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 1bc70cd..0d7553e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -262,36 +262,44 @@ public class AmazonAsyncEventService implements AsyncEventService {
}catch (ClassCastException cce){
logger.error("Failed to deserialize message body",cce);
}
- logger.debug("Processing {} event", event);
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
+ return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis()));
}
+
+ final AsyncEvent thisEvent = event;
+ if(logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
+ }
+
try {
+ Observable<IndexOperationMessage> indexoperationObservable;
//merge each operation to a master observable;
if (event instanceof EdgeDeleteEvent) {
- return handleIndexOperation(message, queueMessage -> handleEdgeDelete(queueMessage));
+ indexoperationObservable = handleEdgeDelete(message);
} else if (event instanceof EdgeIndexEvent) {
- return handleIndexOperation(message, queueMessage -> handleEdgeIndex(queueMessage));
+ indexoperationObservable = handleEdgeIndex(message);
} else if (event instanceof EntityDeleteEvent) {
- return handleIndexOperation(message, queueMessage -> handleEntityDelete(queueMessage));
+ indexoperationObservable = handleEntityDelete(message);
} else if (event instanceof EntityIndexEvent) {
- return handleIndexOperation(message, queueMessage -> handleEntityIndexUpdate(queueMessage));
+ indexoperationObservable = handleEntityIndexUpdate(message);
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
- handleInitializeApplicationIndex(message);
- return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
+ handleInitializeApplicationIndex(event,message);
+ indexoperationObservable = Observable.just(new IndexOperationMessage());
} else {
- logger.error("Unknown EventType: {}", event);//TODO: print json instead
- return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
+ throw new Exception("Unknown EventType");//TODO: print json instead
}
- } catch (Exception e) {
- logger.error("Failed to index entity", e, message);
- return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
- } finally {
- messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
+ //return type that can be indexed and ack'd later
+ return indexoperationObservable
+ .map(indexOperationMessage ->
+ new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime())
+ );
+ } catch (Exception e) {
+ logger.error("Failed to index message: " + message.getMessageId(), e, message);
+ return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()));
}
});
@@ -315,25 +323,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
//ack after successful completion of the operation.
return indexProducer.put(combined)
.flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+ .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
.map(result -> result.getQueueMessage().get());
});
}
- //transform index operation to
- private Observable<IndexEventResult> handleIndexOperation(QueueMessage queueMessage,
- Func1<QueueMessage, Observable<IndexOperationMessage>> operation
- ){
- try{
- return operation.call(queueMessage)
- .map(indexOperationMessage -> new IndexEventResult(Optional.fromNullable(queueMessage), Optional.fromNullable(indexOperationMessage)));
- }catch (Exception e){
- logger.error("failed to run index",e);
- return Observable.just( new IndexEventResult(Optional.fromNullable(queueMessage), Optional.<IndexOperationMessage>absent()));
- }
- }
-
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
@@ -479,11 +475,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- public void handleInitializeApplicationIndex(final QueueMessage message) {
+ public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
-
- final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
@@ -492,7 +485,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
- ack( message );
}
/**
@@ -607,15 +599,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
public class IndexEventResult{
private final Optional<QueueMessage> queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage;
+ private final long creationTime;
- public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage ){
+ public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){
this.queueMessage = queueMessage;
this.indexOperationMessage = indexOperationMessage;
+ this.creationTime = creationTime;
}
+
public Optional<QueueMessage> getQueueMessage() {
return queueMessage;
}
@@ -623,5 +618,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
public Optional<IndexOperationMessage> getIndexOperationMessage() {
return indexOperationMessage;
}
+
+ public long getCreationTime() {
+ return creationTime;
+ }
}
}