You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:03 UTC

[13/50] [abbrv] usergrid git commit: commit stash

commit stash


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7c5a864d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7c5a864d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7c5a864d

Branch: refs/heads/asf-site
Commit: 7c5a864dd1ce22ada5456ced1a6c10f9a5533b1f
Parents: 7dceb56
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 14:34:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 14:34:33 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 71 ++++++++++++++------
 .../index/impl/EsIndexProducerImpl.java         |  6 +-
 2 files changed, 53 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7c5a864d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 14d37b5..1bc70cd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
@@ -226,6 +227,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
+    /**
+     * Ack message in SQS
+     */
+    public void ack(final List<QueueMessage> messages) {
+
+        final Timer.Context timer = this.ackTimer.time();
+
+        try{
+            queue.commitMessages(messages);
+
+            //decrement our in-flight counter
+            inFlight.decrementAndGet();
+
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }finally {
+            timer.stop();
+        }
+
+
+    }
+
 
     private Observable<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
         if (logger.isDebugEnabled()) {
@@ -243,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+                return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
             }
             try {
                 //merge each operation to a master observable;
@@ -258,23 +281,24 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 } else if (event instanceof InitializeApplicationIndexEvent) {
                     //does not return observable
                     handleInitializeApplicationIndex(message);
-                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+                    return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
                 } else {
-                    logger.error("Unknown EventType: {}", event);
-                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+                    logger.error("Unknown EventType: {}", event);//TODO: print json instead
+                    return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
                 }
             } catch (Exception e) {
                 logger.error("Failed to index entity", e, message);
-                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+                return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
             } finally {
                 messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
 
             }
         });
 
+        //filter for success, send to the index(optional), ack
         return masterObservable
             //remove unsuccessful
-            .filter(indexEventResult -> indexEventResult.shouldProcess())
+            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
             //take the max
             .buffer( MAX_TAKE )
             //map them to index results and return them
@@ -290,8 +314,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
                 //ack after successful completion of the operation.
                 return indexProducer.put(combined)
-                    .flatMap(operationResult -> Observable.from(indexEventResults))
-                    .map(result -> result.getQueueMessage());
+                    .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+                    .map(result -> result.getQueueMessage().get());
 
             });
 
@@ -303,10 +327,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
     ){
         try{
             return operation.call(queueMessage)
-                .map(indexOperationMessage -> new IndexEventResult(queueMessage, Optional.fromNullable(indexOperationMessage), true));
+                .map(indexOperationMessage -> new IndexEventResult(Optional.fromNullable(queueMessage), Optional.fromNullable(indexOperationMessage)));
         }catch (Exception e){
             logger.error("failed to run index",e);
-            return Observable.just( new IndexEventResult(queueMessage, Optional.<IndexOperationMessage>absent(),false));
+            return Observable.just( new IndexEventResult(Optional.fromNullable(queueMessage), Optional.<IndexOperationMessage>absent()));
         }
     }
 
@@ -543,10 +567,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
                             //this won't block our read loop, just reads and proceeds
                             .flatMap(messages ->
                                     handleMessages(messages)
-                                        .doOnNext(message -> {
-                                            //ack each message, but only if we didn't error.
-                                            ack(message);
+                                        .buffer(MAX_TAKE)
+                                        .doOnNext(messagesToAck -> {
+                                            try {
+                                                //ack each message, but only if we didn't error.
+                                                ack(messagesToAck);
+                                            } catch (Exception e) {
+                                                logger.error("failed to ack messages to sqs", messagesToAck.get(0).getMessageId(), e);
+                                                //do not rethrow so we can process all of them
+                                            }
                                         })
+                                        .flatMap(messagesToAck -> Observable.from(messagesToAck))
                             );
 
             //start in the background
@@ -574,23 +605,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     public class IndexEventResult{
-        private final QueueMessage queueMessage;
+        private final Optional<QueueMessage> queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;
-        private final boolean shouldProcess;
 
-        public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean shouldProcess){
+
+        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage ){
 
             this.queueMessage = queueMessage;
             this.indexOperationMessage = indexOperationMessage;
-            this.shouldProcess = shouldProcess;
-        }
 
-        public QueueMessage getQueueMessage() {
-            return queueMessage;
         }
 
-        public boolean shouldProcess() {
-            return shouldProcess;
+        public Optional<QueueMessage> getQueueMessage() {
+            return queueMessage;
         }
 
         public Optional<IndexOperationMessage> getIndexOperationMessage() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7c5a864d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 409c2bc..e778948 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -115,6 +115,7 @@ public class EsIndexProducerImpl implements IndexProducer {
         final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
         final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
 
+        //TODO: look at indexing ordering
         final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
 
         //buffer into the max size we can send ES and fire them all off until we're completed
@@ -207,9 +208,10 @@ public class EsIndexProducerImpl implements IndexProducer {
         if ( error ) {
             if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){
                 try{
-                    log.warn("Encountered Queue Capacity Exception from ElasticSearch slowing by " +indexFig.getSleepTimeForQueueError());
+                    log.warn("Encountered Queue Capacity Exception from ElasticSearch slowing by "
+                        + indexFig.getSleepTimeForQueueError() );
                     Thread.sleep(indexFig.getSleepTimeForQueueError());
-                }catch (InterruptedException ie){
+                }catch (Exception e){
                     //move on
                 }
             }