You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by craig w <co...@gmail.com> on 2016/02/11 17:18:48 UTC

messages not consumed before topic partition assignment

I started a [java] consumer which subscribed to a topic "foo" (kafka is
configured to create 2 partitions for topics by default), which previously
did not exist. When subscribing I provided a ConsumerRebalanceListener
which logs the information passed to the onPartitionsAssigned and
onPartitionsRevoked methods.

Shortly after calling subscribe, the onPartitions* methods get invoked with
empty lists. I figure maybe this is expected because no data has been
produced on the topic yet.

I startup the kafka-console-producer, and submit 3 messages. I watch the
consumer, which does "poll" every 5 seconds. No messages are received.
Again, this makes sense since it hasn't been assigned any partitions yet.

A few minutes go by, I see in the logs the consumer is assigned foo-0 and
foo-1. It continues polling, still no messages.

I then submit 2 more messages using the kafka-console-producer, and the
consumer gets those.

The properties I pass to the KafkaConsumer are:

bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
group.id = foo
enable.auto.commit = false
session.timeout.ms = 30000
key.deserializer = ....StringDeserializer
value.deserializer = ....StringDeserializer


Maybe I'm just missing something. Thoughts?

Re: messages not consumed before topic partition assignment

Posted by craig w <co...@gmail.com>.
This seems to be related to "metadata.max.age.ms" [
http://kafka.apache.org/documentation.html#newconsumerconfigs], described as

"The period of time in milliseconds after which we force a refresh of
metadata even if we haven't seen any partition leadership changes to
proactively discover any new brokers or partitions."

By default it's set to 5minutes. Would setting this to say, 10 seconds or
so be problematic for the brokers if I had 100 clients in the cluster?



On Thu, Feb 11, 2016 at 11:18 AM, craig w <co...@gmail.com> wrote:

> I started a [java] consumer which subscribed to a topic "foo" (kafka is
> configured to create 2 partitions for topics by default), which previously
> did not exist. When subscribing I provided a ConsumerRebalanceListener
> which logs the information passed to the onPartitionsAssigned and
> onPartitionsRevoked methods.
>
> Shortly after calling subscribe, the onPartitions* methods get invoked
> with empty lists. I figure maybe this is expected because no data has been
> produced on the topic yet.
>
> I startup the kafka-console-producer, and submit 3 messages. I watch the
> consumer, which does "poll" every 5 seconds. No messages are received.
> Again, this makes sense since it hasn't been assigned any partitions yet.
>
> A few minutes go by, I see in the logs the consumer is assigned foo-0 and
> foo-1. It continues polling, still no messages.
>
> I then submit 2 more messages using the kafka-console-producer, and the
> consumer gets those.
>
> The properties I pass to the KafkaConsumer are:
>
> bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
> group.id = foo
> enable.auto.commit = false
> session.timeout.ms = 30000
> key.deserializer = ....StringDeserializer
> value.deserializer = ....StringDeserializer
>
>
> Maybe I'm just missing something. Thoughts?
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links