You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2013/08/18 03:07:47 UTC

[jira] [Created] (SAMZA-21) Change KafkaSystemConsumer and BrokerProxy consumer defaults

Chris Riccomini created SAMZA-21:
------------------------------------

             Summary: Change KafkaSystemConsumer and BrokerProxy consumer defaults
                 Key: SAMZA-21
                 URL: https://issues.apache.org/jira/browse/SAMZA-21
             Project: Samza
          Issue Type: Bug
            Reporter: Chris Riccomini


The Kafka BrokerProxy and KafkaSystemConsumer currently both have a socket timeout with a default of Int.MaxValue:

{code}
  val timeout: Int = Int.MaxValue,
{code}

We don't actually use this default when we use the KafkaSystemFactory, because we use ConsumerConfig's default, which is 30 seconds. Nevertheless, we should change both classes to default to ConsumerConfig.SocketTimeout.

While we're at it, we should also change bufferSize to be ConsumerConfig.SocketBufferSize, and we should change DefaultFetch to use:

{code}
  val maxWait:Int = ConsumerConfig.MaxFetchWaitMs
  val minBytes:Int = ConsumerConfig.MinFetchBytes
{code}

We should also actually use maxWait in DefaultFetch.defaultFetch. Right now, we're hard coding 1000ms.

Finally, BrokerProxy has the fetchSize hard coded:

{code}
      val fetchSize: Int = 256 * 1024
{code}

We should make this parameter configurable (systems.%s.consumer.fetch.message.max.bytes), and pass it from the KafkaSystemFactory to the KafkaSystem to the BrokerProxy, just like timeout and buffer size.

(ConsumerConfig is Kafka's config class)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira