You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jeff Widman <je...@netskope.com> on 2017/01/18 02:09:32 UTC

Re: Is this a bug or just unintuitive behavior?

[Moving discussion from users list to dev list]

I agree with Ewen that it's more sensible for mirrormaker to default to
replicating topics from the earliest offset available, rather than just
replicating from the current offset onward.

I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668
As well as a PR: https://github.com/apache/kafka/pull/2394

Does this need a KIP?

The main side effect of this change is if you start mirroring a new topic
you can hammer your network until it catches up or until you realize what's
happening and throttle the mirrormaker client.

Cheers,
Jeff


On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> The basic issue here is just that the auto.offset.reset defaults to latest,
> right? That's not a very good setting for a mirroring tool and this seems
> like something we might just want to change the default for. It's debatable
> whether it would even need a KIP.
>
> We have other settings in MM where we override them if they aren't set
> explicitly but we don't want the normal defaults. Most are producer
> properties to avoid duplicates (the acks, retries, max.block.ms, and
> max.in.flight.requests.per.connection settings), but there are a couple of
> consumer ones too (auto.commit.enable and consumer.timeout.ms).
>
> This is probably something like a 1-line MM patch if someone wants to
> tackle it -- the question of whether it needs a KIP or not is,
> unfortunately, the more complicated question :(
>
> -Ewen
>
> On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wu...@gmail.com> wrote:
>
> >
> > > On Jan 5, 2017, at 12:57 PM, Jeff Widman <je...@netskope.com> wrote:
> > >
> > > Thanks James and Hans.
> > >
> > > Will this also happen when we expand the number of partitions in a
> topic?
> > >
> > > That also will trigger a rebalance, the consumer won't subscribe to the
> > > partition until the rebalance finishes, etc.
> > >
> > > So it'd seem that any messages published to the new partition in
> between
> > > the partition creation and the rebalance finishing won't be consumed by
> > any
> > > consumers that have offset=latest
> > >
> >
> > It hadn't occured to me until you mentioned it, but yes, I think it'd
> also
> > happen in those cases.
> >
> > In the kafka consumer javadocs, they provide a list of things that would
> > cause a rebalance:
> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/
> > KafkaConsumer.html#subscribe(java.util.Collection,%20org.
> > apache.kafka.clients.consumer.ConsumerRebalanceListener) <
> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/
> > KafkaConsumer.html#subscribe(java.util.Collection,
> > org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
> >
> > "As part of group management, the consumer will keep track of the list of
> > consumers that belong to a particular group and will trigger a rebalance
> > operation if one of the following events trigger -
> >
> > Number of partitions change for any of the subscribed list of topics
> > Topic is created or deleted
> > An existing member of the consumer group dies
> > A new member is added to an existing consumer group via the join API
> > "
> >
> > I'm guessing that this would affect any of those scenarios.
> >
> > -James
> >
> >
> > >
> > >
> > >
> > > On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wu...@gmail.com>
> > wrote:
> > >
> > >> Jeff,
> > >>
> > >> Your analysis is correct. I would say that it is known but unintuitive
> > >> behavior.
> > >>
> > >> As an example of a problem caused by this behavior, it's possible for
> > >> mirrormaker to miss messages on newly created topics, even thought it
> > was
> > >> subscribed to them before topics were creted.
> > >>
> > >> See the following JIRAs:
> > >> https://issues.apache.org/jira/browse/KAFKA-3848 <
> > >> https://issues.apache.org/jira/browse/KAFKA-3848>
> > >> https://issues.apache.org/jira/browse/KAFKA-3370 <
> > >> https://issues.apache.org/jira/browse/KAFKA-3370>
> > >>
> > >> -James
> > >>
> > >>> On Jan 4, 2017, at 4:37 PM, hans@confluent.io wrote:
> > >>>
> > >>> This sounds exactly as I would expect things to behave. If you
> consume
> > >> from the beginning I would think you would get all the messages but
> not
> > if
> > >> you consume from the latest offset. You can separately tune the
> metadata
> > >> refresh interval if you want to miss fewer messages but that still
> won't
> > >> get you all messages from the beginning if you don't explicitly
> consume
> > >> from the beginning.
> > >>>
> > >>> Sent from my iPhone
> > >>>
> > >>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <je...@netskope.com> wrote:
> > >>>>
> > >>>> I'm seeing consumers miss messages when they subscribe before the
> > topic
> > >> is
> > >>>> actually created.
> > >>>>
> > >>>> Scenario:
> > >>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports
> > topic
> > >>>> auto-creation as soon as a message is published to the topic
> > >>>> 2) consumer subscribes using topic string or a regex pattern.
> > Currently
> > >> no
> > >>>> topics match. Consumer offset is "latest"
> > >>>> 3) producer publishes to a topic that matches the string or regex
> > >> pattern.
> > >>>> 4) broker immediately creates a topic, writes the message, and also
> > >>>> notifies the consumer group that a rebalance needs to happen to
> assign
> > >> the
> > >>>> topic_partition to one of the consumers..
> > >>>> 5) rebalance is fairly quick, maybe a second or so
> > >>>> 6) a consumer is assigned to the newly-created topic_partition
> > >>>>
> > >>>> At this point, we've got a consumer steadily polling the recently
> > >> created
> > >>>> topic_partition. However, the consumer.poll() never returns any
> > messages
> > >>>> published between topic creation and when the consumer was assigned
> to
> > >> the
> > >>>> topic_partition. I'm guessing this may be because when the consumer
> is
> > >>>> assigned to the topic_partition it doesn't find any, so it uses the
> > >> latest
> > >>>> offset, which happens to be after the messages that were published
> to
> > >>>> create the topic.
> > >>>>
> > >>>> This is surprising because the consumer technically was subscribed
> to
> > >> the
> > >>>> topic before the messages were produced, so you'd think the consumer
> > >> would
> > >>>> receive these messages.
> > >>>>
> > >>>> Is this known behavior? A bug in Kafka broker? Or a bug in my client
> > >>>> library?
> > >>
> > >>
> >
> >
>

Re: Is this a bug or just unintuitive behavior?

Posted by James Cheng <wu...@gmail.com>.
There is some discussion going on in https://issues.apache.org/jira/browse/KAFKA-3370 <https://issues.apache.org/jira/browse/KAFKA-3370> about this, as well.

I've added a link to your JIRA about it.

-James

> On Jan 17, 2017, at 9:35 PM, Jeff Widman <je...@netskope.com> wrote:
> 
> Agree on suggesting in the docs that generally the default offset should be
> reset to "none" after the mirrormaker is going.
> 
> There is an edgecase where you want to keep offsets to earliest: When
> you've got a mirrormaker consumer subscribed to a regex pattern and have
> auto-topic creation enabled on your cluster.
> 
> If you start producing to a non-existent topic that matches the regex, then
> there will be a period of time where the producer is producing before the
> new topic's partitions have been picked up by the mirrormaker. Those
> messages will never be consumed by the mirrormaker because it will start
> from latest, ignoring those just-produced messages.
> 
> Also agree on increasing the default offsets retention minutes. I actually
> didn't realize the default was so small.
> 
> On Tue, Jan 17, 2017 at 8:16 PM, Grant Henke <gh...@cloudera.com> wrote:
> 
>> I agree that setting the default auto.offset.reset to earliest makes sense
>> (This was actually a default choice Flume made for its Kafka channel to
>> avoid missing the first messages). However I think, at a minimum, we should
>> also document a recommendation to consider changing the value to none after
>> mirror maker has run to commit its initial offsets.
>> 
>> Setting the value to none ensures you don't replicated the entire topic
>> from scratch in the case offsets are lost or purged due to prolonged
>> downtime or other unforeseen circumstances. Having and auto.offset.reset of
>> none also allows you to ensure you don't miss data. Missing data can occur
>> when auto.offset.reset is set to latest and the offset state was lost
>> before mirrormaker was caught up or data was produced while it was down.
>> 
>> I would also suggest considering increasing the default
>> offsets.retention.minutes
>> from 1 day (1440) to 7 days (10080)...or something similar. I have seen a
>> handful of scenarios where an outage lasts longer than a day, the offsets
>> get purged causing the auto.offset.reset to kick in and in the case of
>> earliest, re-replicating billions of messages.
>> 
>> On Tue, Jan 17, 2017 at 8:09 PM, Jeff Widman <je...@netskope.com> wrote:
>> 
>>> [Moving discussion from users list to dev list]
>>> 
>>> I agree with Ewen that it's more sensible for mirrormaker to default to
>>> replicating topics from the earliest offset available, rather than just
>>> replicating from the current offset onward.
>>> 
>>> I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668
>>> As well as a PR: https://github.com/apache/kafka/pull/2394
>>> 
>>> Does this need a KIP?
>>> 
>>> The main side effect of this change is if you start mirroring a new topic
>>> you can hammer your network until it catches up or until you realize
>> what's
>>> happening and throttle the mirrormaker client.
>>> 
>>> Cheers,
>>> Jeff
>>> 
>>> 
>>> On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <ewen@confluent.io
>>> 
>>> wrote:
>>> 
>>>> The basic issue here is just that the auto.offset.reset defaults to
>>> latest,
>>>> right? That's not a very good setting for a mirroring tool and this
>> seems
>>>> like something we might just want to change the default for. It's
>>> debatable
>>>> whether it would even need a KIP.
>>>> 
>>>> We have other settings in MM where we override them if they aren't set
>>>> explicitly but we don't want the normal defaults. Most are producer
>>>> properties to avoid duplicates (the acks, retries, max.block.ms, and
>>>> max.in.flight.requests.per.connection settings), but there are a
>> couple
>>> of
>>>> consumer ones too (auto.commit.enable and consumer.timeout.ms).
>>>> 
>>>> This is probably something like a 1-line MM patch if someone wants to
>>>> tackle it -- the question of whether it needs a KIP or not is,
>>>> unfortunately, the more complicated question :(
>>>> 
>>>> -Ewen
>>>> 
>>>> On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wu...@gmail.com>
>>> wrote:
>>>> 
>>>>> 
>>>>>> On Jan 5, 2017, at 12:57 PM, Jeff Widman <je...@netskope.com>
>> wrote:
>>>>>> 
>>>>>> Thanks James and Hans.
>>>>>> 
>>>>>> Will this also happen when we expand the number of partitions in a
>>>> topic?
>>>>>> 
>>>>>> That also will trigger a rebalance, the consumer won't subscribe to
>>> the
>>>>>> partition until the rebalance finishes, etc.
>>>>>> 
>>>>>> So it'd seem that any messages published to the new partition in
>>>> between
>>>>>> the partition creation and the rebalance finishing won't be
>> consumed
>>> by
>>>>> any
>>>>>> consumers that have offset=latest
>>>>>> 
>>>>> 
>>>>> It hadn't occured to me until you mentioned it, but yes, I think it'd
>>>> also
>>>>> happen in those cases.
>>>>> 
>>>>> In the kafka consumer javadocs, they provide a list of things that
>>> would
>>>>> cause a rebalance:
>>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
>>> clients/consumer/
>>>>> KafkaConsumer.html#subscribe(java.util.Collection,%20org.
>>>>> apache.kafka.clients.consumer.ConsumerRebalanceListener) <
>>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
>>> clients/consumer/
>>>>> KafkaConsumer.html#subscribe(java.util.Collection,
>>>>> org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
>>>>> 
>>>>> "As part of group management, the consumer will keep track of the
>> list
>>> of
>>>>> consumers that belong to a particular group and will trigger a
>>> rebalance
>>>>> operation if one of the following events trigger -
>>>>> 
>>>>> Number of partitions change for any of the subscribed list of topics
>>>>> Topic is created or deleted
>>>>> An existing member of the consumer group dies
>>>>> A new member is added to an existing consumer group via the join API
>>>>> "
>>>>> 
>>>>> I'm guessing that this would affect any of those scenarios.
>>>>> 
>>>>> -James
>>>>> 
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wushujames@gmail.com
>>> 
>>>>> wrote:
>>>>>> 
>>>>>>> Jeff,
>>>>>>> 
>>>>>>> Your analysis is correct. I would say that it is known but
>>> unintuitive
>>>>>>> behavior.
>>>>>>> 
>>>>>>> As an example of a problem caused by this behavior, it's possible
>>> for
>>>>>>> mirrormaker to miss messages on newly created topics, even thought
>>> it
>>>>> was
>>>>>>> subscribed to them before topics were creted.
>>>>>>> 
>>>>>>> See the following JIRAs:
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848 <
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370 <
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370>
>>>>>>> 
>>>>>>> -James
>>>>>>> 
>>>>>>>> On Jan 4, 2017, at 4:37 PM, hans@confluent.io wrote:
>>>>>>>> 
>>>>>>>> This sounds exactly as I would expect things to behave. If you
>>>> consume
>>>>>>> from the beginning I would think you would get all the messages
>> but
>>>> not
>>>>> if
>>>>>>> you consume from the latest offset. You can separately tune the
>>>> metadata
>>>>>>> refresh interval if you want to miss fewer messages but that still
>>>> won't
>>>>>>> get you all messages from the beginning if you don't explicitly
>>>> consume
>>>>>>> from the beginning.
>>>>>>>> 
>>>>>>>> Sent from my iPhone
>>>>>>>> 
>>>>>>>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <je...@netskope.com>
>>> wrote:
>>>>>>>>> 
>>>>>>>>> I'm seeing consumers miss messages when they subscribe before
>> the
>>>>> topic
>>>>>>> is
>>>>>>>>> actually created.
>>>>>>>>> 
>>>>>>>>> Scenario:
>>>>>>>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but
>> supports
>>>>> topic
>>>>>>>>> auto-creation as soon as a message is published to the topic
>>>>>>>>> 2) consumer subscribes using topic string or a regex pattern.
>>>>> Currently
>>>>>>> no
>>>>>>>>> topics match. Consumer offset is "latest"
>>>>>>>>> 3) producer publishes to a topic that matches the string or
>> regex
>>>>>>> pattern.
>>>>>>>>> 4) broker immediately creates a topic, writes the message, and
>>> also
>>>>>>>>> notifies the consumer group that a rebalance needs to happen to
>>>> assign
>>>>>>> the
>>>>>>>>> topic_partition to one of the consumers..
>>>>>>>>> 5) rebalance is fairly quick, maybe a second or so
>>>>>>>>> 6) a consumer is assigned to the newly-created topic_partition
>>>>>>>>> 
>>>>>>>>> At this point, we've got a consumer steadily polling the
>> recently
>>>>>>> created
>>>>>>>>> topic_partition. However, the consumer.poll() never returns any
>>>>> messages
>>>>>>>>> published between topic creation and when the consumer was
>>> assigned
>>>> to
>>>>>>> the
>>>>>>>>> topic_partition. I'm guessing this may be because when the
>>> consumer
>>>> is
>>>>>>>>> assigned to the topic_partition it doesn't find any, so it uses
>>> the
>>>>>>> latest
>>>>>>>>> offset, which happens to be after the messages that were
>> published
>>>> to
>>>>>>>>> create the topic.
>>>>>>>>> 
>>>>>>>>> This is surprising because the consumer technically was
>> subscribed
>>>> to
>>>>>>> the
>>>>>>>>> topic before the messages were produced, so you'd think the
>>> consumer
>>>>>>> would
>>>>>>>>> receive these messages.
>>>>>>>>> 
>>>>>>>>> Is this known behavior? A bug in Kafka broker? Or a bug in my
>>> client
>>>>>>>>> library?
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> 
>> --
>> Grant Henke
>> Software Engineer | Cloudera
>> grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>> 


Re: Is this a bug or just unintuitive behavior?

Posted by Jeff Widman <je...@netskope.com>.
Agree on suggesting in the docs that generally the default offset should be
reset to "none" after the mirrormaker is going.

There is an edgecase where you want to keep offsets to earliest: When
you've got a mirrormaker consumer subscribed to a regex pattern and have
auto-topic creation enabled on your cluster.

If you start producing to a non-existent topic that matches the regex, then
there will be a period of time where the producer is producing before the
new topic's partitions have been picked up by the mirrormaker. Those
messages will never be consumed by the mirrormaker because it will start
from latest, ignoring those just-produced messages.

Also agree on increasing the default offsets retention minutes. I actually
didn't realize the default was so small.

On Tue, Jan 17, 2017 at 8:16 PM, Grant Henke <gh...@cloudera.com> wrote:

> I agree that setting the default auto.offset.reset to earliest makes sense
> (This was actually a default choice Flume made for its Kafka channel to
> avoid missing the first messages). However I think, at a minimum, we should
> also document a recommendation to consider changing the value to none after
> mirror maker has run to commit its initial offsets.
>
> Setting the value to none ensures you don't replicated the entire topic
> from scratch in the case offsets are lost or purged due to prolonged
> downtime or other unforeseen circumstances. Having and auto.offset.reset of
> none also allows you to ensure you don't miss data. Missing data can occur
> when auto.offset.reset is set to latest and the offset state was lost
> before mirrormaker was caught up or data was produced while it was down.
>
> I would also suggest considering increasing the default
> offsets.retention.minutes
> from 1 day (1440) to 7 days (10080)...or something similar. I have seen a
> handful of scenarios where an outage lasts longer than a day, the offsets
> get purged causing the auto.offset.reset to kick in and in the case of
> earliest, re-replicating billions of messages.
>
> On Tue, Jan 17, 2017 at 8:09 PM, Jeff Widman <je...@netskope.com> wrote:
>
> > [Moving discussion from users list to dev list]
> >
> > I agree with Ewen that it's more sensible for mirrormaker to default to
> > replicating topics from the earliest offset available, rather than just
> > replicating from the current offset onward.
> >
> > I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668
> > As well as a PR: https://github.com/apache/kafka/pull/2394
> >
> > Does this need a KIP?
> >
> > The main side effect of this change is if you start mirroring a new topic
> > you can hammer your network until it catches up or until you realize
> what's
> > happening and throttle the mirrormaker client.
> >
> > Cheers,
> > Jeff
> >
> >
> > On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> > wrote:
> >
> > > The basic issue here is just that the auto.offset.reset defaults to
> > latest,
> > > right? That's not a very good setting for a mirroring tool and this
> seems
> > > like something we might just want to change the default for. It's
> > debatable
> > > whether it would even need a KIP.
> > >
> > > We have other settings in MM where we override them if they aren't set
> > > explicitly but we don't want the normal defaults. Most are producer
> > > properties to avoid duplicates (the acks, retries, max.block.ms, and
> > > max.in.flight.requests.per.connection settings), but there are a
> couple
> > of
> > > consumer ones too (auto.commit.enable and consumer.timeout.ms).
> > >
> > > This is probably something like a 1-line MM patch if someone wants to
> > > tackle it -- the question of whether it needs a KIP or not is,
> > > unfortunately, the more complicated question :(
> > >
> > > -Ewen
> > >
> > > On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wu...@gmail.com>
> > wrote:
> > >
> > > >
> > > > > On Jan 5, 2017, at 12:57 PM, Jeff Widman <je...@netskope.com>
> wrote:
> > > > >
> > > > > Thanks James and Hans.
> > > > >
> > > > > Will this also happen when we expand the number of partitions in a
> > > topic?
> > > > >
> > > > > That also will trigger a rebalance, the consumer won't subscribe to
> > the
> > > > > partition until the rebalance finishes, etc.
> > > > >
> > > > > So it'd seem that any messages published to the new partition in
> > > between
> > > > > the partition creation and the rebalance finishing won't be
> consumed
> > by
> > > > any
> > > > > consumers that have offset=latest
> > > > >
> > > >
> > > > It hadn't occured to me until you mentioned it, but yes, I think it'd
> > > also
> > > > happen in those cases.
> > > >
> > > > In the kafka consumer javadocs, they provide a list of things that
> > would
> > > > cause a rebalance:
> > > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > clients/consumer/
> > > > KafkaConsumer.html#subscribe(java.util.Collection,%20org.
> > > > apache.kafka.clients.consumer.ConsumerRebalanceListener) <
> > > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > clients/consumer/
> > > > KafkaConsumer.html#subscribe(java.util.Collection,
> > > > org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
> > > >
> > > > "As part of group management, the consumer will keep track of the
> list
> > of
> > > > consumers that belong to a particular group and will trigger a
> > rebalance
> > > > operation if one of the following events trigger -
> > > >
> > > > Number of partitions change for any of the subscribed list of topics
> > > > Topic is created or deleted
> > > > An existing member of the consumer group dies
> > > > A new member is added to an existing consumer group via the join API
> > > > "
> > > >
> > > > I'm guessing that this would affect any of those scenarios.
> > > >
> > > > -James
> > > >
> > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wushujames@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Jeff,
> > > > >>
> > > > >> Your analysis is correct. I would say that it is known but
> > unintuitive
> > > > >> behavior.
> > > > >>
> > > > >> As an example of a problem caused by this behavior, it's possible
> > for
> > > > >> mirrormaker to miss messages on newly created topics, even thought
> > it
> > > > was
> > > > >> subscribed to them before topics were creted.
> > > > >>
> > > > >> See the following JIRAs:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3848 <
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3848>
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3370 <
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3370>
> > > > >>
> > > > >> -James
> > > > >>
> > > > >>> On Jan 4, 2017, at 4:37 PM, hans@confluent.io wrote:
> > > > >>>
> > > > >>> This sounds exactly as I would expect things to behave. If you
> > > consume
> > > > >> from the beginning I would think you would get all the messages
> but
> > > not
> > > > if
> > > > >> you consume from the latest offset. You can separately tune the
> > > metadata
> > > > >> refresh interval if you want to miss fewer messages but that still
> > > won't
> > > > >> get you all messages from the beginning if you don't explicitly
> > > consume
> > > > >> from the beginning.
> > > > >>>
> > > > >>> Sent from my iPhone
> > > > >>>
> > > > >>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <je...@netskope.com>
> > wrote:
> > > > >>>>
> > > > >>>> I'm seeing consumers miss messages when they subscribe before
> the
> > > > topic
> > > > >> is
> > > > >>>> actually created.
> > > > >>>>
> > > > >>>> Scenario:
> > > > >>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but
> supports
> > > > topic
> > > > >>>> auto-creation as soon as a message is published to the topic
> > > > >>>> 2) consumer subscribes using topic string or a regex pattern.
> > > > Currently
> > > > >> no
> > > > >>>> topics match. Consumer offset is "latest"
> > > > >>>> 3) producer publishes to a topic that matches the string or
> regex
> > > > >> pattern.
> > > > >>>> 4) broker immediately creates a topic, writes the message, and
> > also
> > > > >>>> notifies the consumer group that a rebalance needs to happen to
> > > assign
> > > > >> the
> > > > >>>> topic_partition to one of the consumers..
> > > > >>>> 5) rebalance is fairly quick, maybe a second or so
> > > > >>>> 6) a consumer is assigned to the newly-created topic_partition
> > > > >>>>
> > > > >>>> At this point, we've got a consumer steadily polling the
> recently
> > > > >> created
> > > > >>>> topic_partition. However, the consumer.poll() never returns any
> > > > messages
> > > > >>>> published between topic creation and when the consumer was
> > assigned
> > > to
> > > > >> the
> > > > >>>> topic_partition. I'm guessing this may be because when the
> > consumer
> > > is
> > > > >>>> assigned to the topic_partition it doesn't find any, so it uses
> > the
> > > > >> latest
> > > > >>>> offset, which happens to be after the messages that were
> published
> > > to
> > > > >>>> create the topic.
> > > > >>>>
> > > > >>>> This is surprising because the consumer technically was
> subscribed
> > > to
> > > > >> the
> > > > >>>> topic before the messages were produced, so you'd think the
> > consumer
> > > > >> would
> > > > >>>> receive these messages.
> > > > >>>>
> > > > >>>> Is this known behavior? A bug in Kafka broker? Or a bug in my
> > client
> > > > >>>> library?
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>

Re: Is this a bug or just unintuitive behavior?

Posted by Grant Henke <gh...@cloudera.com>.
I agree that setting the default auto.offset.reset to earliest makes sense
(This was actually a default choice Flume made for its Kafka channel to
avoid missing the first messages). However I think, at a minimum, we should
also document a recommendation to consider changing the value to none after
mirror maker has run to commit its initial offsets.

Setting the value to none ensures you don't replicated the entire topic
from scratch in the case offsets are lost or purged due to prolonged
downtime or other unforeseen circumstances. Having and auto.offset.reset of
none also allows you to ensure you don't miss data. Missing data can occur
when auto.offset.reset is set to latest and the offset state was lost
before mirrormaker was caught up or data was produced while it was down.

I would also suggest considering increasing the default
offsets.retention.minutes
from 1 day (1440) to 7 days (10080)...or something similar. I have seen a
handful of scenarios where an outage lasts longer than a day, the offsets
get purged causing the auto.offset.reset to kick in and in the case of
earliest, re-replicating billions of messages.

On Tue, Jan 17, 2017 at 8:09 PM, Jeff Widman <je...@netskope.com> wrote:

> [Moving discussion from users list to dev list]
>
> I agree with Ewen that it's more sensible for mirrormaker to default to
> replicating topics from the earliest offset available, rather than just
> replicating from the current offset onward.
>
> I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668
> As well as a PR: https://github.com/apache/kafka/pull/2394
>
> Does this need a KIP?
>
> The main side effect of this change is if you start mirroring a new topic
> you can hammer your network until it catches up or until you realize what's
> happening and throttle the mirrormaker client.
>
> Cheers,
> Jeff
>
>
> On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > The basic issue here is just that the auto.offset.reset defaults to
> latest,
> > right? That's not a very good setting for a mirroring tool and this seems
> > like something we might just want to change the default for. It's
> debatable
> > whether it would even need a KIP.
> >
> > We have other settings in MM where we override them if they aren't set
> > explicitly but we don't want the normal defaults. Most are producer
> > properties to avoid duplicates (the acks, retries, max.block.ms, and
> > max.in.flight.requests.per.connection settings), but there are a couple
> of
> > consumer ones too (auto.commit.enable and consumer.timeout.ms).
> >
> > This is probably something like a 1-line MM patch if someone wants to
> > tackle it -- the question of whether it needs a KIP or not is,
> > unfortunately, the more complicated question :(
> >
> > -Ewen
> >
> > On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wu...@gmail.com>
> wrote:
> >
> > >
> > > > On Jan 5, 2017, at 12:57 PM, Jeff Widman <je...@netskope.com> wrote:
> > > >
> > > > Thanks James and Hans.
> > > >
> > > > Will this also happen when we expand the number of partitions in a
> > topic?
> > > >
> > > > That also will trigger a rebalance, the consumer won't subscribe to
> the
> > > > partition until the rebalance finishes, etc.
> > > >
> > > > So it'd seem that any messages published to the new partition in
> > between
> > > > the partition creation and the rebalance finishing won't be consumed
> by
> > > any
> > > > consumers that have offset=latest
> > > >
> > >
> > > It hadn't occured to me until you mentioned it, but yes, I think it'd
> > also
> > > happen in those cases.
> > >
> > > In the kafka consumer javadocs, they provide a list of things that
> would
> > > cause a rebalance:
> > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> clients/consumer/
> > > KafkaConsumer.html#subscribe(java.util.Collection,%20org.
> > > apache.kafka.clients.consumer.ConsumerRebalanceListener) <
> > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> clients/consumer/
> > > KafkaConsumer.html#subscribe(java.util.Collection,
> > > org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
> > >
> > > "As part of group management, the consumer will keep track of the list
> of
> > > consumers that belong to a particular group and will trigger a
> rebalance
> > > operation if one of the following events trigger -
> > >
> > > Number of partitions change for any of the subscribed list of topics
> > > Topic is created or deleted
> > > An existing member of the consumer group dies
> > > A new member is added to an existing consumer group via the join API
> > > "
> > >
> > > I'm guessing that this would affect any of those scenarios.
> > >
> > > -James
> > >
> > >
> > > >
> > > >
> > > >
> > > > On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wu...@gmail.com>
> > > wrote:
> > > >
> > > >> Jeff,
> > > >>
> > > >> Your analysis is correct. I would say that it is known but
> unintuitive
> > > >> behavior.
> > > >>
> > > >> As an example of a problem caused by this behavior, it's possible
> for
> > > >> mirrormaker to miss messages on newly created topics, even thought
> it
> > > was
> > > >> subscribed to them before topics were creted.
> > > >>
> > > >> See the following JIRAs:
> > > >> https://issues.apache.org/jira/browse/KAFKA-3848 <
> > > >> https://issues.apache.org/jira/browse/KAFKA-3848>
> > > >> https://issues.apache.org/jira/browse/KAFKA-3370 <
> > > >> https://issues.apache.org/jira/browse/KAFKA-3370>
> > > >>
> > > >> -James
> > > >>
> > > >>> On Jan 4, 2017, at 4:37 PM, hans@confluent.io wrote:
> > > >>>
> > > >>> This sounds exactly as I would expect things to behave. If you
> > consume
> > > >> from the beginning I would think you would get all the messages but
> > not
> > > if
> > > >> you consume from the latest offset. You can separately tune the
> > metadata
> > > >> refresh interval if you want to miss fewer messages but that still
> > won't
> > > >> get you all messages from the beginning if you don't explicitly
> > consume
> > > >> from the beginning.
> > > >>>
> > > >>> Sent from my iPhone
> > > >>>
> > > >>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <je...@netskope.com>
> wrote:
> > > >>>>
> > > >>>> I'm seeing consumers miss messages when they subscribe before the
> > > topic
> > > >> is
> > > >>>> actually created.
> > > >>>>
> > > >>>> Scenario:
> > > >>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports
> > > topic
> > > >>>> auto-creation as soon as a message is published to the topic
> > > >>>> 2) consumer subscribes using topic string or a regex pattern.
> > > Currently
> > > >> no
> > > >>>> topics match. Consumer offset is "latest"
> > > >>>> 3) producer publishes to a topic that matches the string or regex
> > > >> pattern.
> > > >>>> 4) broker immediately creates a topic, writes the message, and
> also
> > > >>>> notifies the consumer group that a rebalance needs to happen to
> > assign
> > > >> the
> > > >>>> topic_partition to one of the consumers..
> > > >>>> 5) rebalance is fairly quick, maybe a second or so
> > > >>>> 6) a consumer is assigned to the newly-created topic_partition
> > > >>>>
> > > >>>> At this point, we've got a consumer steadily polling the recently
> > > >> created
> > > >>>> topic_partition. However, the consumer.poll() never returns any
> > > messages
> > > >>>> published between topic creation and when the consumer was
> assigned
> > to
> > > >> the
> > > >>>> topic_partition. I'm guessing this may be because when the
> consumer
> > is
> > > >>>> assigned to the topic_partition it doesn't find any, so it uses
> the
> > > >> latest
> > > >>>> offset, which happens to be after the messages that were published
> > to
> > > >>>> create the topic.
> > > >>>>
> > > >>>> This is surprising because the consumer technically was subscribed
> > to
> > > >> the
> > > >>>> topic before the messages were produced, so you'd think the
> consumer
> > > >> would
> > > >>>> receive these messages.
> > > >>>>
> > > >>>> Is this known behavior? A bug in Kafka broker? Or a bug in my
> client
> > > >>>> library?
> > > >>
> > > >>
> > >
> > >
> >
>



-- 
Grant Henke
Software Engineer | Cloudera
grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke