You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Dengpan Yin (Jira)" <ji...@apache.org> on 2019/09/05 21:12:00 UTC

[jira] [Commented] (SAMZA-2308) AsyncRunLoop and TieredPriorityChooser fail to be used together

    [ https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923731#comment-16923731 ] 

Dengpan Yin commented on SAMZA-2308:
------------------------------------

 

The Runloop will call SystemConsumers to choose a message for processing. SystemConsumer will use the chooser to choose a message. When a priority is configured for a stream, a TieredPriorityChooser will be used to wrap a RoundRobin chooser. The TieredPriorityChooser will traverse all the choosers from high priority to low priority, check whether a message can be chosen from the underlying RoundRobin chooser.  The RoundRobinChooser will return a message from its own buffer if has one, otherwise null.  Regarding to the problems we observed, there is no messages in the buffer of the high priority chooser. So a message is returned from a lower priority chooser.  When the chooser chooses a message, it only choose the message from its own buffer, it does not check the buffer of the SystemConsumers, though there could be still some messages in the SystemConsumers with higher priority.  SystemConsumers is responsible to move messages from its own buffer to the chooser's buffer if explicitly told to  when the RunLoop calls the choose function of SystemConsumers.  The AsyncTaskWorker can also call SystemConsumers.tryUpdate to move messages to the chooser's buffer when starting to process a message. The AsyncTaskWorker will not call SystemConsumers.tryUpdate when performing window/scheduler/commit/endOfStream/noop operations.

Back to the problems we observed, The RunLoop choose a high priority message from a chooser and handle over to a AsyncTaskWorker. The AsyncTaskWorker need to perform a COMMIT operation (could be window/scheduler/noop as well) in that cycle, so the message is not consumed and the chooser's buffer is empty but didn't pull in a new message.   For the next loop, the SystemConsumer will chooser a lower priority message to process.  Samza's current implementation is not guarantee the exact order according to the priority, but just make the best effort.

There's a simple way to mitigate the problem by calling "consumerMultiplexer.choose(true)" from the RunLoop instead of "consumerMultiplexer.choose(false)". With this change, the test cases attached can pass.  This solution will not totally fix the problem, and it's not clear whether new problems can be introduced by this change.

Another solution could be modify the TieredPriorityChooser, for the high priority chooser, instead of only choose message from the chooser's buffer, it should do something as the following for the high priority system stream:

if there's a message in the chooser's buffer, choose the message

else if there's message in the SystemConsumer's buffer can be filled to the chooser, then fill the chooser's buffer and return the message

else if the underlying consumer can pull message from the remote for the system stream, then pull the message from the remote

otherwise, check the chooser of the next system stream in the descending order of the priority. 

 

[~seanmcnealy]  Do you have a workaround for this problem? For instance, instead of configure a high priority for the stream, can you make it a bootstrap stream? 

 

cc: [~soze] to prioritize the work for this ticket. 

 

Thanks

Dengpan

 

 

 

 

 

 

 

 

 

 

 

> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
>                 Key: SAMZA-2308
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2308
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 0.13.1, 1.0, 1.1, 1.2
>            Reporter: Sean McNealy
>            Assignee: Dengpan Yin
>            Priority: Major
>         Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the worker is ready and the envelope begins processing. This allows choosing more messages to schedule to tasks which keeps threads busy and allows for scheduling lower priority messages when partitions are available. Good things when threads are available. When a message is chosen for an already processing partition it is added to the task's pendingEnvelopeQueue so that the run loop can choose yet more messages. But the TieredPriorityChooser may respond with a lower priority message for a SystemStreamPartition that is lower than another nonempty SystemStreamPartition since there is no way to exclude a partition or priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What happens is that for a partition, one messages from every SystemStream that is not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)