You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/02 23:56:05 UTC

[14/50] [abbrv] usergrid git commit: Updates the message flow to allow for multiple processor threads per SQS take thread

Updates the message flow to allow for multiple processor threads per SQS take thread


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76476f17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76476f17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76476f17

Branch: refs/heads/USERGRID-909
Commit: 76476f17cf8e8be6f01660db3d21110eda8247f5
Parents: 1b43bda
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:35:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:35:34 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 68 +++++++++++---------
 .../index/IndexProcessorFig.java                | 13 +++-
 2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d93e304..6b9abbc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -679,35 +679,45 @@ public class AmazonAsyncEventService implements AsyncEventService {
                             }
                             while ( true );
                         }
-                    } )
-                            //this won't block our read loop, just reads and proceeds
-                            .map( messages -> {
-                                if ( messages == null || messages.size() == 0 ) {
-                                    return null;
-                                }
-
-                                try {
-                                    List<IndexEventResult> indexEventResults = callEventHandlers( messages );
-                                    List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
-                                    if ( messagesToAck == null || messagesToAck.size() == 0 ) {
-                                        logger.error( "No messages came back from the queue operation should have seen "
-                                            + messages.size(), messages );
-                                        return messagesToAck;
-                                    }
-                                    if ( messagesToAck.size() < messages.size() ) {
-                                        logger.error( "Missing messages from queue post operation", messages,
-                                            messagesToAck );
-                                    }
-                                    //ack each message, but only if we didn't error.
-                                    ack( messagesToAck );
-                                    return messagesToAck;
-                                }
-                                catch ( Exception e ) {
-                                    logger.error( "failed to ack messages to sqs", e );
-                                    return null;
-                                    //do not rethrow so we can process all of them
-                                }
-                            } );
+                    } )        //this won't block our read loop, just reads and proceeds
+                        .flatMap( sqsMessages -> {
+
+                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+                            return Observable.just( sqsMessages )
+
+                                             .map( messages -> {
+                                                 if ( messages == null || messages.size() == 0 ) {
+                                                     return null;
+                                                 }
+
+                                                 try {
+                                                     List<IndexEventResult> indexEventResults =
+                                                         callEventHandlers( messages );
+                                                     List<QueueMessage> messagesToAck =
+                                                         submitToIndex( indexEventResults );
+                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                                         logger.error(
+                                                             "No messages came back from the queue operation should "
+                                                                 + "have seen "
+                                                                 + messages.size(), messages );
+                                                         return messagesToAck;
+                                                     }
+                                                     if ( messagesToAck.size() < messages.size() ) {
+                                                         logger.error( "Missing messages from queue post operation",
+                                                             messages, messagesToAck );
+                                                     }
+                                                     //ack each message, but only if we didn't error.
+                                                     ack( messagesToAck );
+                                                     return messagesToAck;
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     logger.error( "failed to ack messages to sqs", e );
+                                                     return null;
+                                                     //do not rethrow so we can process all of them
+                                                 }
+                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+                            //end flatMap
+                        }, indexProcessorFig.getEventConcurrencyFactor() );
 
             //start in the background
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7650c62..9d02717 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
+    String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
+
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
 
     String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
@@ -70,9 +72,18 @@ public interface IndexProcessorFig extends GuicyFig {
     int getIndexQueueVisibilityTimeout();
 
     /**
+     * The number of worker threads used when handing off messages from the SQS thread
+     */
+    @Default( "20" )
+    @Key( EVENT_CONCURRENCY_FACTOR )
+    int getEventConcurrencyFactor();
+
+
+
+    /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "16" )
+    @Default( "8" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();