You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "James Lent (JIRA)" <ji...@apache.org> on 2018/03/06 16:36:01 UTC

[jira] [Commented] (SAMZA-1599) AsyncRunLoop Slow When Some Partitions Are Empty

    [ https://issues.apache.org/jira/browse/SAMZA-1599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388064#comment-16388064 ] 

James Lent commented on SAMZA-1599:
-----------------------------------

 Here is a chart that summarizes some testing I did with a small test job.  The details are:
 * 6 Partitions in Kafka Topic.
 * Only one partition had messages and it had 100.
 * Each run restarted at the beginning of the topic.
 * The time reported is the time between when the first and the last messages were processed.
 * The _process_ method just counts events and logs the number processed (i.e. does basically nothing).
 * There were variations in the times for each scenario tested (I ran it on my dev box).  I would therefore not read anything into the small variations between "new" and "old".
 * Old refers to the 0.14.1-SNAPSHOT code (as it existed yesterday: 1dfc5ce)
 * New refers to that same code with my patch applied.

 
| | | | | | | | |*Old*|*New*|
| | | |*Got Thread*|*Got Single*|*Process*|*Run Loop*| |*Process*|*Process*|
|*pool.size*|thread.mode| |*Pool Size*|*Thread Mode*|*Thread*|*Mode*| |*Time (ms)*|*Time (ms)*|
|not specified|not specified| |0|false|ThreadJob|asynchronous| |14|14|
|not specified|false| |0|false|ThreadJob|asynchronous| |16|15|
|not specified|true| |0|true|ThreadJob|single thread| |12|12|
|1|not specified| |1|false|Container Thread|asynchronous| |1022|16|
|1|false| |1|false|Container Thread|asynchronous| |1023|16|
|1|true| |1|true|ThreadJob|single thread| |13|12|
|6|not specified| |6|false|Container Thread|asynchronous| |1015|21|
|6|false| |6|false|Container Thread|asynchronous| |1005|24|
|6|true| |6|true|ThreadJob|single thread| |11|12|

> AsyncRunLoop Slow When Some Partitions Are Empty
> ------------------------------------------------
>
>                 Key: SAMZA-1599
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1599
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.12.0, 0.14.0
>            Reporter: James Lent
>            Priority: Major
>         Attachments: slow-async-run-loop.patch
>
>
> During performance testing of a relatively simple multi-threaded job we noted that as the Kafka partitions started to exhaust the individual partition tasks that normally processed several messages per ms started to slow down significantly and head towards a throughput of only one message each every 10 ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 ms looking for new work to choose from the empty partitions and in the process starving the tasks working on partitions with more work to do.
> We found this in a private branch based on 0.12.0 and I have reproduced the issue in master. I coded up a potential fix which works for us in 0.12.0.  I have re-based the fix on master and tested it there too.
> Here is a detailed description of what I think is going on assuming there is only one partition with messages to process:
>  * AsyncRunLoop (main) thread Loop #1
>  ** Chooser selects a message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** Fetches the first message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the first message in a worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is available).
>  * Worker thread
>  ** Starts processing the message
>  * AsyncRunLoop (main) thread Loop #2
>  ** Chooser selects a second message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
>  ** Runs the AsyncTaskWorker associated with that SSP:
>  ** The Worker thread is still busy processing the previous message so the AsyncTaskWorker does nothing.
>  ** Message is not "fetched" and therefore the Chooser is NOT updated.
>  * AsyncRunLoop (main) thread Loop #3
>  ** Chooser can not find a message to process.
>  ** Start a poll with a timeout of 10 ms on all the partitions with no messages (this does not include the one partition with messages).
>  * Worker thread
>  ** Completes processing first message
>  ** It updates AsyncTaskWorker state
>  * Here is where the throughput suffers
>  ** AsyncTaskWorker is ready for more work, but, the main thread that hands out the work is stuck "polling"
>  * AsyncRunLoop (main) thread Loop #3 continues
>  ** Wakes up after 10 ms
>  ** Runs the the AsyncTaskWorker associated with that SSP.
>  ** The AsyncTaskWorker is now ready to more work.
>  ** Fetches the second message from the pendingEnvelopeQueue.
>  ** Submits request to the AsyncStreamTaskAdapter to process the second message in a (new) worker thread
>  ** Updates the Chooser with a new message from that SSP (if one is available).
> The key changes in my fix are:
>  * In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser returns no messages.  Just "drain" any messages that have already arrived.
>  * Instead in the main loop "wait" when the Chooser returns no messages using the existing synchronization lock that is already used to wait when all the AsyncTaskWorkers are busy.
>  * There is one boolean added to deal with a race condition which could otherwise trigger "waits" we don't need and therefore impact throughput.  It may cause us to occasionally skip a "wait" that would be actually be OK but I think the performance penalty of that is pretty small (one extra spin of the main loop when there is no work to handle).
> The change I made is pretty small, but, the code it is changing is fairly complex.  I would be very interested in:
>  * Getting your insights on the problem I have described. Maybe there is a setting that could work around this behavior.  I could not find one.
>  * Feedback on my proposed solution. Especially potential dangers.
> I have attached a patch file to issue. I can open a merge request if that would be a better way to get your input.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)