You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:05 UTC

[15/50] [abbrv] usergrid git commit: cleanup observable, add realistic timer, make sure messages that error are not ack'd

cleanup observable, add realistic timer, make sure messages that error are not ack'd


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9da0179f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9da0179f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9da0179f

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 9da0179f17d636d44ee8c4121c909632582aafe7
Parents: 7c5a864
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 18:29:56 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 18:29:56 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 67 ++++++++++----------
 1 file changed, 33 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9da0179f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 1bc70cd..0d7553e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -262,36 +262,44 @@ public class AmazonAsyncEventService implements AsyncEventService {
             }catch (ClassCastException cce){
                 logger.error("Failed to deserialize message body",cce);
             }
-            logger.debug("Processing {} event", event);
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
+                return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis()));
             }
+
+            final AsyncEvent thisEvent = event;
+            if(logger.isDebugEnabled()) {
+                logger.debug("Processing {} event", event);
+            }
+
             try {
+                Observable<IndexOperationMessage> indexoperationObservable;
                 //merge each operation to a master observable;
                 if (event instanceof EdgeDeleteEvent) {
-                    return handleIndexOperation(message, queueMessage -> handleEdgeDelete(queueMessage));
+                    indexoperationObservable = handleEdgeDelete(message);
                 } else if (event instanceof EdgeIndexEvent) {
-                    return handleIndexOperation(message, queueMessage -> handleEdgeIndex(queueMessage));
+                    indexoperationObservable = handleEdgeIndex(message);
                 } else if (event instanceof EntityDeleteEvent) {
-                    return handleIndexOperation(message, queueMessage -> handleEntityDelete(queueMessage));
+                    indexoperationObservable = handleEntityDelete(message);
                 } else if (event instanceof EntityIndexEvent) {
-                    return handleIndexOperation(message, queueMessage -> handleEntityIndexUpdate(queueMessage));
+                    indexoperationObservable = handleEntityIndexUpdate(message);
                 } else if (event instanceof InitializeApplicationIndexEvent) {
                     //does not return observable
-                    handleInitializeApplicationIndex(message);
-                    return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
+                    handleInitializeApplicationIndex(event,message);
+                    indexoperationObservable = Observable.just(new IndexOperationMessage());
                 } else {
-                    logger.error("Unknown EventType: {}", event);//TODO: print json instead
-                    return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
+                    throw new Exception("Unknown EventType");//TODO: print json instead
                 }
-            } catch (Exception e) {
-                logger.error("Failed to index entity", e, message);
-                return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
-            } finally {
-                messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
 
+                //return type that can be indexed and ack'd later
+                return indexoperationObservable
+                    .map(indexOperationMessage ->
+                            new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime())
+                    );
+            } catch (Exception e) {
+                logger.error("Failed to index message: " + message.getMessageId(), e, message);
+                return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()));
             }
         });
 
@@ -315,25 +323,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 //ack after successful completion of the operation.
                 return indexProducer.put(combined)
                     .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+                    .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
                     .map(result -> result.getQueueMessage().get());
 
             });
 
     }
 
-    //transform index operation to
-    private Observable<IndexEventResult> handleIndexOperation(QueueMessage queueMessage,
-                                                              Func1<QueueMessage, Observable<IndexOperationMessage>> operation
-    ){
-        try{
-            return operation.call(queueMessage)
-                .map(indexOperationMessage -> new IndexEventResult(Optional.fromNullable(queueMessage), Optional.fromNullable(indexOperationMessage)));
-        }catch (Exception e){
-            logger.error("failed to run index",e);
-            return Observable.just( new IndexEventResult(Optional.fromNullable(queueMessage), Optional.<IndexOperationMessage>absent()));
-        }
-    }
-
 
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
@@ -479,11 +475,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    public void handleInitializeApplicationIndex(final QueueMessage message) {
+    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
         Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
 
         final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
@@ -492,7 +485,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
         final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
         index.initialize();
-        ack( message );
     }
 
     /**
@@ -607,15 +599,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public class IndexEventResult{
         private final Optional<QueueMessage> queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;
+        private final long creationTime;
 
 
-        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage ){
+        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){
 
             this.queueMessage = queueMessage;
             this.indexOperationMessage = indexOperationMessage;
 
+            this.creationTime = creationTime;
         }
 
+
         public Optional<QueueMessage> getQueueMessage() {
             return queueMessage;
         }
@@ -623,5 +618,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
         public Optional<IndexOperationMessage> getIndexOperationMessage() {
             return indexOperationMessage;
         }
+
+        public long getCreationTime() {
+            return creationTime;
+        }
     }
 }