You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/02 23:56:05 UTC
[14/50] [abbrv] usergrid git commit: Updates the message flow to
allow for multiple processor threads per SQS take thread
Updates the message flow to allow for multiple processor threads per SQS take thread
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76476f17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76476f17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76476f17
Branch: refs/heads/USERGRID-909
Commit: 76476f17cf8e8be6f01660db3d21110eda8247f5
Parents: 1b43bda
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:35:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:35:34 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 68 +++++++++++---------
.../index/IndexProcessorFig.java | 13 +++-
2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d93e304..6b9abbc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -679,35 +679,45 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
while ( true );
}
- } )
- //this won't block our read loop, just reads and proceeds
- .map( messages -> {
- if ( messages == null || messages.size() == 0 ) {
- return null;
- }
-
- try {
- List<IndexEventResult> indexEventResults = callEventHandlers( messages );
- List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
- if ( messagesToAck == null || messagesToAck.size() == 0 ) {
- logger.error( "No messages came back from the queue operation should have seen "
- + messages.size(), messages );
- return messagesToAck;
- }
- if ( messagesToAck.size() < messages.size() ) {
- logger.error( "Missing messages from queue post operation", messages,
- messagesToAck );
- }
- //ack each message, but only if we didn't error.
- ack( messagesToAck );
- return messagesToAck;
- }
- catch ( Exception e ) {
- logger.error( "failed to ack messages to sqs", e );
- return null;
- //do not rethrow so we can process all of them
- }
- } );
+ } ) //this won't block our read loop, just reads and proceeds
+ .flatMap( sqsMessages -> {
+
+ //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+ return Observable.just( sqsMessages )
+
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ return null;
+ }
+
+ try {
+ List<IndexEventResult> indexEventResults =
+ callEventHandlers( messages );
+ List<QueueMessage> messagesToAck =
+ submitToIndex( indexEventResults );
+ if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+ logger.error(
+ "No messages came back from the queue operation should "
+ + "have seen "
+ + messages.size(), messages );
+ return messagesToAck;
+ }
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.error( "Missing messages from queue post operation",
+ messages, messagesToAck );
+ }
+ //ack each message, but only if we didn't error.
+ ack( messagesToAck );
+ return messagesToAck;
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to ack messages to sqs", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+ //end flatMap
+ }, indexProcessorFig.getEventConcurrencyFactor() );
//start in the background
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7650c62..9d02717 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+ String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
+
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
@@ -70,9 +72,18 @@ public interface IndexProcessorFig extends GuicyFig {
int getIndexQueueVisibilityTimeout();
/**
+ * The number of worker threads used when handing off messages from the SQS thread
+ */
+ @Default( "20" )
+ @Key( EVENT_CONCURRENCY_FACTOR )
+ int getEventConcurrencyFactor();
+
+
+
+ /**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default( "16" )
+ @Default( "8" )
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();