You are viewing a plain text version of this content. The canonical link for it is here.
Posted to infrastructure-issues@apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2013/08/06 05:07:52 UTC

[jira] [Created] (INFRA-6636) BrokerProxy deadlocks if messages aren't polled from all streams

Chris Riccomini created INFRA-6636:
--------------------------------------

             Summary: BrokerProxy deadlocks if messages aren't polled from all streams
                 Key: INFRA-6636
                 URL: https://issues.apache.org/jira/browse/INFRA-6636
             Project: Infrastructure
          Issue Type: Bug
      Security Level: public (Regular issues)
            Reporter: Chris Riccomini


Suppose a KafkaSystemConsumer is created with:

{code}
consumer.register(sp1, 0)
consumer.register(sp2, 0)
consumer.start
while(true) {
  consumer.poll(sp2, 0)
}
{code}

This code will eventually dead lock (assuming sp1 has messages) if sp1 and sp2 are both on the same broker.

The current implementation of BrokerProxy/KafkaSystemConsumer puts messages into BlockingEnvelopeMap. The BlockingEnvelopeMap has a per-SystemStreamPartition max queueSize (defaults to 1000, I believe). This means that, if a SystemStreamPartition is registered with the KafkaSystemConsumer, but messages are not read off of the SystemStreamPartition's queue for some reason, the BrokerProxy/KafkaSystemConsumer will eventually block on BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching any more messages from ANY topic/partitions on the broker. If code is trying to read messages from another SystemStreamPartition, it will not ever receive any new messages.

This is not currently a problem because Samza reads in messages in two ways:

1) During state store restore.
2) During process loop (feeding messages to StreamTask.process).

The current SamzaContainer implementation uses a new SystemConsumer for each SystemStreamPartition when it restores (#1), which registers ONLY one SystemStreamPartition, so no deadlock is possible here. The current DefaultChooser round robins between streams, which means that you will always poll from all streams with messages available in a round-robin fashion, so no starvation is currently possible (which means that no deadlock is possible).

Nevertheless, this should be fixed. For one thing, if we change the DefaultChooser's behavior, this problem would surface.

The simplest solution would be to stop fetching messages in the BrokerProxy for queues that are full. An alternative would be to stop fetching messages for any queue that has messages already in it (regardless of whether it's "full" or not).

One nuance to the stop-fetching-on-queue-full solution is that FetchRequest takes a fetchSize, which is in bytes. This means that we might get more messages back in one FetchRequest than would fit into the BlockingEnvelopeMap queue. We could drop these excess messages, and re-fetch them again later.

I think the best solution is just:

1) Stop fetching messages for any queue that's not empty.
2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded LinkedBlockingQueue.

The new memory semantics for the KafkaSystemConsumer would be that the LinkedBlockingQueue would hold up to N elements where N is the max number of elements that can theoretically be returned in a single FetchRequest for a given TopicAndPartition. Thus, if a KafkaSystemConsumer's fetchSize were 1 megabyte, and 1 meg could return a theoretical maximum of 1 million messages (1 byte per message), then the maximum number of messages you'd expect to see in any single unbounded LinkedBlockingQueue would be 1 million. Once this queue was drained to zero, a new fetch would be issued.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira