You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jay Kreps (JIRA)" <ji...@apache.org> on 2013/08/28 01:47:53 UTC

[jira] [Commented] (SAMZA-3) BrokerProxy deadlocks if messages aren't polled from all streams

    [ https://issues.apache.org/jira/browse/SAMZA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13751911#comment-13751911 ] 

Jay Kreps commented on SAMZA-3:
-------------------------------

+1 I agree with your reasoning. This fix looks good. One slightly odd side-effect is that the latency on topics that get full could now become higher. I.e. say that you have two topics BIG and SMALL. If you try to fetch with a 30 second timeout on these two, likely BIG will immediately fill its queue and stop fetching, then you do another fetch on SMALL which will block for 30 seconds. Meanwhile the queue for BIG will drain and it will want more data, but no new fetch will occur until the previous fetch response comes in. Basically any wait time in the fetch will potentially be stall time (even if it is just a second it may be relevant). You can tune around this. I don't see a better solution, though.
                
> BrokerProxy deadlocks if messages aren't polled from all streams
> ----------------------------------------------------------------
>
>                 Key: SAMZA-3
>                 URL: https://issues.apache.org/jira/browse/SAMZA-3
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-3.0.patch
>
>
> Suppose a KafkaSystemConsumer is created with: 
> {code}
> consumer.register(sp1, 0) 
> consumer.register(sp2, 0) 
> consumer.start 
> while(true) { 
>   consumer.poll(sp2, 0) 
> } 
> {code}
> This code will eventually dead lock (assuming sp1 has messages) if sp1 and sp2 are both on the same broker. 
> The current implementation of BrokerProxy/KafkaSystemConsumer puts messages into BlockingEnvelopeMap. The BlockingEnvelopeMap has a per-SystemStreamPartition max queueSize (defaults to 1000, I believe). This means that, if a SystemStreamPartition is registered with the KafkaSystemConsumer, but messages are not read off of the SystemStreamPartition's queue for some reason, the BrokerProxy/KafkaSystemConsumer will eventually block on BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching any more messages from ANY topic/partitions on the broker. If code is trying to read messages from another SystemStreamPartition, it will not ever receive any new messages. 
> This is not currently a problem because Samza reads in messages in two ways: 
> 1) During state store restore. 
> 2) During process loop (feeding messages to StreamTask.process). 
> The current SamzaContainer implementation uses a new SystemConsumer for each SystemStreamPartition when it restores (#1), which registers ONLY one SystemStreamPartition, so no deadlock is possible here. The current DefaultChooser round robins between streams, which means that you will always poll from all streams with messages available in a round-robin fashion, so no starvation is currently possible (which means that no deadlock is possible). 
> Nevertheless, this should be fixed. For one thing, if we change the DefaultChooser's behavior, this problem would surface. 
> The simplest solution would be to stop fetching messages in the BrokerProxy for queues that are full. An alternative would be to stop fetching messages for any queue that has messages already in it (regardless of whether it's "full" or not). 
> One nuance to the stop-fetching-on-queue-full solution is that FetchRequest takes a fetchSize, which is in bytes. This means that we might get more messages back in one FetchRequest than would fit into the BlockingEnvelopeMap queue. We could drop these excess messages, and re-fetch them again later. 
> I think the best solution is just: 
> 1) Stop fetching messages for any queue that's not empty. 
> 2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded LinkedBlockingQueue. 
> The new memory semantics for the KafkaSystemConsumer would be that the LinkedBlockingQueue would hold up to N elements where N is the max number of elements that can theoretically be returned in a single FetchRequest for a given TopicAndPartition. Thus, if a KafkaSystemConsumer's fetchSize were 1 megabyte, and 1 meg could return a theoretical maximum of 1 million messages (1 byte per message), then the maximum number of messages you'd expect to see in any single unbounded LinkedBlockingQueue would be 1 million. Once this queue was drained to zero, a new fetch would be issued.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira