You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Carl Heymann <ch...@gmail.com> on 2015/06/15 21:12:38 UTC

At-least-once guarantees with high-level consumer

Hi

** Disclaimer: I know there's a new consumer API on the way, this mail is
about the currently available API. I also apologise if the below has
already been discussed previously. I did try to check previous discussions
on ConsumerIterator **

It seems to me that the high-level consumer would be able to support
at-least-once messaging, even if one uses auto-commit, by changing
kafka.consumer.ConsumerIterator.next() to call
currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
consumer thread for a KafkaStream could just loop:

while (true) {
    MyMessage message = iterator.next().message();
    process(message);
}

Each call to "iterator.next()" then updates the offset to commit to the end
of the message that was just processed. When offsets are committed for the
ConsumerConnector (either automatically or manually), the commit will not
include offsets of messages that haven't been fully processed.

I've tested the following ConsumerIterator.next(), and it seems to work as
I expect:

  override def next(): MessageAndMetadata[K, V] = {
    // New code: reset consumer offset to the end of the previously
consumed message:
    if (consumedOffset > -1L && currentTopicInfo != null) {
        currentTopicInfo.resetConsumeOffset(consumedOffset)
        val topic = currentTopicInfo.topic
        trace("Setting %s consumed offset to %d".format(topic,
consumedOffset))
    }

    // Old code, excluding reset:
    val item = super.next()
    if(consumedOffset < 0)
      throw new KafkaException("Offset returned by the message set is
invalid %d".format(consumedOffset))
    val topic = currentTopicInfo.topic
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }

I've seen several people asking about managing commit offsets manually with
the high level consumer. I suspect that this approach (the modified
ConsumerIterator) would scale better than having a separate
ConsumerConnecter per stream just so that you can commit offsets with
at-least-once semantics. The downside of this approach is more duplicate
deliveries after recovery from hard failure (but this is "at least once",
right, not "exactly once").

I don't propose that the code necessarily be changed like this in trunk, I
just want to know if the approach seems reasonable.

Regards
Carl Heymann

Fwd: At-least-once guarantees with high-level consumer

Posted by Carl Heymann <ch...@gmail.com>.
Hi

I've sent this to the user mailing list on Monday, but maybe it's more dev
oriented, so this is for the dev list.

** Disclaimer: I know there's a new consumer API on the way, this mail is
about the currently available API. I also apologise if the below has
already been discussed previously. I did try to check previous discussions
on ConsumerIterator **

It seems to me that the high-level consumer would be able to support
at-least-once messaging, even if one uses auto-commit, by changing
kafka.consumer.ConsumerIterator.next() to call
currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
consumer thread for a KafkaStream could just loop:

while (true) {
    MyMessage message = iterator.next().message();
    process(message);
}

Each call to "iterator.next()" then updates the offset to commit to the end
of the message that was just processed. When offsets are committed for the
ConsumerConnector (either automatically or manually), the commit will not
include offsets of messages that haven't been fully processed.

I've tested the following ConsumerIterator.next(), and it seems to work as
I expect:

  override def next(): MessageAndMetadata[K, V] = {
    // New code: reset consumer offset to the end of the previously
consumed message:
    if (consumedOffset > -1L && currentTopicInfo != null) {
        currentTopicInfo.resetConsumeOffset(consumedOffset)
        val topic = currentTopicInfo.topic
        trace("Setting %s consumed offset to %d".format(topic,
consumedOffset))
    }

    // Old code, excluding reset:
    val item = super.next()
    if(consumedOffset < 0)
      throw new KafkaException("Offset returned by the message set is
invalid %d".format(consumedOffset))
    val topic = currentTopicInfo.topic
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }

I've seen several people asking about managing commit offsets manually with
the high level consumer. I suspect that this approach (the modified
ConsumerIterator) would scale better than having a separate
ConsumerConnecter per stream just so that you can commit offsets with
at-least-once semantics. The downside of this approach is more duplicate
deliveries after recovery from hard failure (but this is "at least once",
right, not "exactly once").

I don't propose that the code necessarily be changed like this in trunk, I
just want to know if the approach seems reasonable.

Regards
Carl Heymann

Re: At-least-once guarantees with high-level consumer

Posted by tao xiao <xi...@gmail.com>.
Carl,

I double if the change you proposed will have at-least-once guarantee.
consumedOffset
is the next offset of the message that is being returned from
iterator.next(). For example the message returned is A with offset 1 and
then consumedOffset will be 2 set to currentTopicInfo. While the consumer
is processing message A it is possible that the offset 2 will be committed
no matter if we put currentTopicInfo.resetConsumeOffset(..) before
super.next or after super.next. If consumer fails at this stage then
message A will not be reprocessed next time the consumer is started again
since offset 2 is committed.


On Mon, 22 Jun 2015 at 15:36 Carl Heymann <ch...@gmail.com> wrote:

> OK, thanks. I agree, the current code is better if you get lots of
> rebalancing, and you can do your own thing for stronger guarantees.
>
> For the new consumer, it looks like it should be possible to use multiple
> threads, as long as partition order is preserved in the processing, right?
> So, one can build a custom API similar to the current connector + streams.
> But I guess that's a different discussion.
>
> With the new consumer API, rebalancing is handled during poll(), which is
> called from a client. What if some client stops polling, will this cause
> rebalancing hiccups for all consumers in the cluster? Let me know if this
> has already been discussed.
>
> On Mon, Jun 22, 2015 at 8:50 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
> wrote:
>
> > Yes, your approach works. I am not sure if we should take this as default
> > solution, though. User can have a simple wrapper + customized rebalance
> > listener. The tricky part is that the rebalance listener might need
> > different implementations. So it looks the current API provides enough
> > simplicity and enough flexibility.
> >
> > For the new consumer, if there is only one user thread, this might not be
> > a issue. If the consumer is shared by multiple threads (which is not
> > recommended), similar principle applies - commit offsets only after
> > processing them.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On 6/21/15, 10:50 PM, "Carl Heymann" <ch...@gmail.com> wrote:
> >
> > >Thanks Jiangjie
> > >
> > >So you agree that with the modified ConsumerIterator.next() code, the
> high
> > >level consumer becomes at-least-once, even with auto-commit enabled?
> That
> > >is what I really want to know.
> > >
> > >I'll have a look at the rebalancing code. I think I understand: during
> > >rebalancing, with auto-commit enabled, the offsets are committed
> > >in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some
> processing
> > >might still be happening at this point. The rebalance listener is called
> > >only after this commit. So the current code (without my change) would
> lead
> > >to fewer duplicate messages, because it assumes that these transactions
> > >normally complete. This seems prudent, since rebalancing happens much
> more
> > >frequently than java processes being killed unexpectedly. On the other
> > >hand
> > >it means giving up at-least-once guarantees for message processing,
> when a
> > >java process actually does die unexpectedly.
> > >
> > >So I see it should be better to create a custom offset tracking&commit
> > >component, with some ability to wait a reasonable amount of time for
> > >consumer threads on streams to complete their current transaction, on
> > >rebalance, before committing from a rebalance listener.
> > >
> > >Is it OK to block for a second or two
> > >in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
> > >processing threads to complete? Will this hold up the whole cluster's
> > >rebalancing?
> > >
> > >The new KafkaConsumer code doesn't appear to do a commit in the same way
> > >during rebalance, when autocommit is enabled. So if current users of the
> > >high level consumer switch to the new consumer, they might get more
> > >duplicates on rebalance, right?
> > >
> > >Regards
> > >Carl
> > >
> > >
> > >On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <jqin@linkedin.com.invalid
> >
> > >wrote:
> > >
> > >> Hi Carl,
> > >>
> > >> Generally, you approach works to guarantee at least once consumption -
> > >> basically people have to commit offset only after they have processed
> > >>the
> > >> message.
> > >> The only problem is that in old high level consumer, during consumer
> > >> rebalance consumer will (and should) commit offsets. To guarantee
> > >> at-least-once and avoid unecessary duplicates on rebalance, ideally we
> > >> should wait until all the messages returned by iterator to be
> processed
> > >> before commit offsets.
> > >>
> > >> In LinkedIn, we have wrapper around open source consumer iterator
> where
> > >>we
> > >> can implants those logics.
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 6/19/15, 12:22 AM, "Carl Heymann" <ch...@gmail.com> wrote:
> > >>
> > >> >Thanks Bhavesh.
> > >> >
> > >> >I understand that to get "exactly once" processing of a message
> > >>requires
> > >> >some de-duplication. What I'm saying, is that the current high level
> > >> >consumer, with automatic offset commits enabled, gives neither "at
> most
> > >> >once" nor "at least once" guarantees: A consumer group might get
> > >>duplicate
> > >> >messages, but might also never fully process some messages (which is
> a
> > >> >bigger problem for me).
> > >> >
> > >> >With the code change I propose, I think it changes to "at least
> once",
> > >> >i.e.
> > >> >one can then do the deduplication you describe, without worrying
> about
> > >> >"losing" messages. Messages should not get committed without being
> > >>fully
> > >> >processed. I want to know if this code change has any obvious
> problems.
> > >> >
> > >> >Regards
> > >> >Carl
> > >> >
> > >> >
> > >> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
> > >> ><mistry.p.bhavesh@gmail.com
> > >> >> wrote:
> > >> >
> > >> >> HI Carl,
> > >> >>
> > >> >> Produce side retry can produce duplicated message being sent to
> > >>brokers
> > >> >> with different offset with same message. Also, you may get
> duplicated
> > >> >>when
> > >> >> the High Level Consumer offset is not being saved or commit but you
> > >>have
> > >> >> processed data and your server restart etc...
> > >> >>
> > >> >>
> > >> >>
> > >> >> To guaranteed at-least one processing across partitions (and across
> > >> >> servers), you will need to store message hash or primary key into
> > >> >> distributed LRU cache (with eviction policy )  like Hazelcast
> > >> >> <http://www.hazelcast.com> and do dedupping across partitions.
> > >> >>
> > >> >>
> > >> >>
> > >> >> I hope this help !
> > >> >>
> > >> >>
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> Bhavesh
> > >> >>
> > >> >>
> > >> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
> > >> >>
> > >> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
> > >> >>reasonable?
> > >> >> >
> > >> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> > >> >> >
> > >> >> >  --047d7bfcf30ed09b460518b241db
> > >> >> >>
> > >> >> >> Content-Type: text/plain; charset=UTF-8
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> With auto-commit one can only have at-most-once delivery
> > >>guarantee -
> > >> >> after
> > >> >> >>
> > >> >> >> commit but before message is delivered for processing, or even
> > >>after
> > >> >>it
> > >> >> is
> > >> >> >>
> > >> >> >> delivered but before it is processed, things can fail, causing
> > >>event
> > >> >>not
> > >> >> >> to
> > >> >> >>
> > >> >> >> be processed, which is basically same outcome as if it was not
> > >> >> delivered.
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann
> > >><ch...@gmail.com>
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> > Hi
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > ** Disclaimer: I know there's a new consumer API on the way,
> > >>this
> > >> >>mail
> > >> >> >> is
> > >> >> >>
> > >> >> >> > about the currently available API. I also apologise if the
> below
> > >> >>has
> > >> >> >>
> > >> >> >> > already been discussed previously. I did try to check previous
> > >> >> >> discussions
> > >> >> >>
> > >> >> >> > on ConsumerIterator **
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > It seems to me that the high-level consumer would be able to
> > >> >>support
> > >> >> >>
> > >> >> >> > at-least-once messaging, even if one uses auto-commit, by
> > >>changing
> > >> >> >>
> > >> >> >> > kafka.consumer.ConsumerIterator.next() to call
> > >> >> >>
> > >> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next().
> > >>This
> > >> >> >> way, a
> > >> >> >>
> > >> >> >> > consumer thread for a KafkaStream could just loop:
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > while (true) {
> > >> >> >>
> > >> >> >> >     MyMessage message = iterator.next().message();
> > >> >> >>
> > >> >> >> >     process(message);
> > >> >> >>
> > >> >> >> > }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > Each call to "iterator.next()" then updates the offset to
> > >>commit to
> > >> >> the
> > >> >> >> end
> > >> >> >>
> > >> >> >> > of the message that was just processed. When offsets are
> > >>committed
> > >> >>for
> > >> >> >> the
> > >> >> >>
> > >> >> >> > ConsumerConnector (either automatically or manually), the
> commit
> > >> >>will
> > >> >> >> not
> > >> >> >>
> > >> >> >> > include offsets of messages that haven't been fully processed.
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I've tested the following ConsumerIterator.next(), and it
> seems
> > >>to
> > >> >> work
> > >> >> >> as
> > >> >> >>
> > >> >> >> > I expect:
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> >   override def next(): MessageAndMetadata[K, V] = {
> > >> >> >>
> > >> >> >> >     // New code: reset consumer offset to the end of the
> > >>previously
> > >> >> >>
> > >> >> >> > consumed message:
> > >> >> >>
> > >> >> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> > >> >> >>
> > >> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> > >> >> >>
> > >> >> >> >         val topic = currentTopicInfo.topic
> > >> >> >>
> > >> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
> > >> >> >>
> > >> >> >> > consumedOffset))
> > >> >> >>
> > >> >> >> >     }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> >     // Old code, excluding reset:
> > >> >> >>
> > >> >> >> >     val item = super.next()
> > >> >> >>
> > >> >> >> >     if(consumedOffset < 0)
> > >> >> >>
> > >> >> >> >       throw new KafkaException("Offset returned by the message
> > >>set
> > >> >>is
> > >> >> >>
> > >> >> >> > invalid %d".format(consumedOffset))
> > >> >> >>
> > >> >> >> >     val topic = currentTopicInfo.topic
> > >> >> >>
> > >> >> >> >
> > >> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> > >> >> >>
> > >> >> >> >
> > >> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> > >> >> >>
> > >> >> >> >     item
> > >> >> >>
> > >> >> >> >   }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I've seen several people asking about managing commit offsets
> > >> >>manually
> > >> >> >> with
> > >> >> >>
> > >> >> >> > the high level consumer. I suspect that this approach (the
> > >>modified
> > >> >> >>
> > >> >> >> > ConsumerIterator) would scale better than having a separate
> > >> >> >>
> > >> >> >> > ConsumerConnecter per stream just so that you can commit
> offsets
> > >> >>with
> > >> >> >>
> > >> >> >> > at-least-once semantics. The downside of this approach is more
> > >> >> duplicate
> > >> >> >>
> > >> >> >> > deliveries after recovery from hard failure (but this is "at
> > >>least
> > >> >> >> once",
> > >> >> >>
> > >> >> >> > right, not "exactly once").
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I don't propose that the code necessarily be changed like this
> > >>in
> > >> >> >> trunk, I
> > >> >> >>
> > >> >> >> > just want to know if the approach seems reasonable.
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > Regards
> > >> >> >>
> > >> >> >> > Carl Heymann
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> --047d7bfcf30ed09b460518b241db--
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> > >>
> >
> >
>

Re: At-least-once guarantees with high-level consumer

Posted by Carl Heymann <ch...@gmail.com>.
OK, thanks. I agree, the current code is better if you get lots of
rebalancing, and you can do your own thing for stronger guarantees.

For the new consumer, it looks like it should be possible to use multiple
threads, as long as partition order is preserved in the processing, right?
So, one can build a custom API similar to the current connector + streams.
But I guess that's a different discussion.

With the new consumer API, rebalancing is handled during poll(), which is
called from a client. What if some client stops polling, will this cause
rebalancing hiccups for all consumers in the cluster? Let me know if this
has already been discussed.

On Mon, Jun 22, 2015 at 8:50 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> Yes, your approach works. I am not sure if we should take this as default
> solution, though. User can have a simple wrapper + customized rebalance
> listener. The tricky part is that the rebalance listener might need
> different implementations. So it looks the current API provides enough
> simplicity and enough flexibility.
>
> For the new consumer, if there is only one user thread, this might not be
> a issue. If the consumer is shared by multiple threads (which is not
> recommended), similar principle applies - commit offsets only after
> processing them.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 6/21/15, 10:50 PM, "Carl Heymann" <ch...@gmail.com> wrote:
>
> >Thanks Jiangjie
> >
> >So you agree that with the modified ConsumerIterator.next() code, the high
> >level consumer becomes at-least-once, even with auto-commit enabled? That
> >is what I really want to know.
> >
> >I'll have a look at the rebalancing code. I think I understand: during
> >rebalancing, with auto-commit enabled, the offsets are committed
> >in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing
> >might still be happening at this point. The rebalance listener is called
> >only after this commit. So the current code (without my change) would lead
> >to fewer duplicate messages, because it assumes that these transactions
> >normally complete. This seems prudent, since rebalancing happens much more
> >frequently than java processes being killed unexpectedly. On the other
> >hand
> >it means giving up at-least-once guarantees for message processing, when a
> >java process actually does die unexpectedly.
> >
> >So I see it should be better to create a custom offset tracking&commit
> >component, with some ability to wait a reasonable amount of time for
> >consumer threads on streams to complete their current transaction, on
> >rebalance, before committing from a rebalance listener.
> >
> >Is it OK to block for a second or two
> >in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
> >processing threads to complete? Will this hold up the whole cluster's
> >rebalancing?
> >
> >The new KafkaConsumer code doesn't appear to do a commit in the same way
> >during rebalance, when autocommit is enabled. So if current users of the
> >high level consumer switch to the new consumer, they might get more
> >duplicates on rebalance, right?
> >
> >Regards
> >Carl
> >
> >
> >On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
> >wrote:
> >
> >> Hi Carl,
> >>
> >> Generally, you approach works to guarantee at least once consumption -
> >> basically people have to commit offset only after they have processed
> >>the
> >> message.
> >> The only problem is that in old high level consumer, during consumer
> >> rebalance consumer will (and should) commit offsets. To guarantee
> >> at-least-once and avoid unecessary duplicates on rebalance, ideally we
> >> should wait until all the messages returned by iterator to be processed
> >> before commit offsets.
> >>
> >> In LinkedIn, we have wrapper around open source consumer iterator where
> >>we
> >> can implants those logics.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 6/19/15, 12:22 AM, "Carl Heymann" <ch...@gmail.com> wrote:
> >>
> >> >Thanks Bhavesh.
> >> >
> >> >I understand that to get "exactly once" processing of a message
> >>requires
> >> >some de-duplication. What I'm saying, is that the current high level
> >> >consumer, with automatic offset commits enabled, gives neither "at most
> >> >once" nor "at least once" guarantees: A consumer group might get
> >>duplicate
> >> >messages, but might also never fully process some messages (which is a
> >> >bigger problem for me).
> >> >
> >> >With the code change I propose, I think it changes to "at least once",
> >> >i.e.
> >> >one can then do the deduplication you describe, without worrying about
> >> >"losing" messages. Messages should not get committed without being
> >>fully
> >> >processed. I want to know if this code change has any obvious problems.
> >> >
> >> >Regards
> >> >Carl
> >> >
> >> >
> >> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
> >> ><mistry.p.bhavesh@gmail.com
> >> >> wrote:
> >> >
> >> >> HI Carl,
> >> >>
> >> >> Produce side retry can produce duplicated message being sent to
> >>brokers
> >> >> with different offset with same message. Also, you may get duplicated
> >> >>when
> >> >> the High Level Consumer offset is not being saved or commit but you
> >>have
> >> >> processed data and your server restart etc...
> >> >>
> >> >>
> >> >>
> >> >> To guaranteed at-least one processing across partitions (and across
> >> >> servers), you will need to store message hash or primary key into
> >> >> distributed LRU cache (with eviction policy )  like Hazelcast
> >> >> <http://www.hazelcast.com> and do dedupping across partitions.
> >> >>
> >> >>
> >> >>
> >> >> I hope this help !
> >> >>
> >> >>
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Bhavesh
> >> >>
> >> >>
> >> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
> >> >>
> >> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
> >> >>reasonable?
> >> >> >
> >> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> >> >> >
> >> >> >  --047d7bfcf30ed09b460518b241db
> >> >> >>
> >> >> >> Content-Type: text/plain; charset=UTF-8
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> With auto-commit one can only have at-most-once delivery
> >>guarantee -
> >> >> after
> >> >> >>
> >> >> >> commit but before message is delivered for processing, or even
> >>after
> >> >>it
> >> >> is
> >> >> >>
> >> >> >> delivered but before it is processed, things can fail, causing
> >>event
> >> >>not
> >> >> >> to
> >> >> >>
> >> >> >> be processed, which is basically same outcome as if it was not
> >> >> delivered.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann
> >><ch...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> > Hi
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > ** Disclaimer: I know there's a new consumer API on the way,
> >>this
> >> >>mail
> >> >> >> is
> >> >> >>
> >> >> >> > about the currently available API. I also apologise if the below
> >> >>has
> >> >> >>
> >> >> >> > already been discussed previously. I did try to check previous
> >> >> >> discussions
> >> >> >>
> >> >> >> > on ConsumerIterator **
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > It seems to me that the high-level consumer would be able to
> >> >>support
> >> >> >>
> >> >> >> > at-least-once messaging, even if one uses auto-commit, by
> >>changing
> >> >> >>
> >> >> >> > kafka.consumer.ConsumerIterator.next() to call
> >> >> >>
> >> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next().
> >>This
> >> >> >> way, a
> >> >> >>
> >> >> >> > consumer thread for a KafkaStream could just loop:
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > while (true) {
> >> >> >>
> >> >> >> >     MyMessage message = iterator.next().message();
> >> >> >>
> >> >> >> >     process(message);
> >> >> >>
> >> >> >> > }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > Each call to "iterator.next()" then updates the offset to
> >>commit to
> >> >> the
> >> >> >> end
> >> >> >>
> >> >> >> > of the message that was just processed. When offsets are
> >>committed
> >> >>for
> >> >> >> the
> >> >> >>
> >> >> >> > ConsumerConnector (either automatically or manually), the commit
> >> >>will
> >> >> >> not
> >> >> >>
> >> >> >> > include offsets of messages that haven't been fully processed.
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I've tested the following ConsumerIterator.next(), and it seems
> >>to
> >> >> work
> >> >> >> as
> >> >> >>
> >> >> >> > I expect:
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> >   override def next(): MessageAndMetadata[K, V] = {
> >> >> >>
> >> >> >> >     // New code: reset consumer offset to the end of the
> >>previously
> >> >> >>
> >> >> >> > consumed message:
> >> >> >>
> >> >> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> >> >> >>
> >> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> >> >> >>
> >> >> >> >         val topic = currentTopicInfo.topic
> >> >> >>
> >> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
> >> >> >>
> >> >> >> > consumedOffset))
> >> >> >>
> >> >> >> >     }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> >     // Old code, excluding reset:
> >> >> >>
> >> >> >> >     val item = super.next()
> >> >> >>
> >> >> >> >     if(consumedOffset < 0)
> >> >> >>
> >> >> >> >       throw new KafkaException("Offset returned by the message
> >>set
> >> >>is
> >> >> >>
> >> >> >> > invalid %d".format(consumedOffset))
> >> >> >>
> >> >> >> >     val topic = currentTopicInfo.topic
> >> >> >>
> >> >> >> >
> >> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> >> >> >>
> >> >> >> >
> >> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> >> >> >>
> >> >> >> >     item
> >> >> >>
> >> >> >> >   }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I've seen several people asking about managing commit offsets
> >> >>manually
> >> >> >> with
> >> >> >>
> >> >> >> > the high level consumer. I suspect that this approach (the
> >>modified
> >> >> >>
> >> >> >> > ConsumerIterator) would scale better than having a separate
> >> >> >>
> >> >> >> > ConsumerConnecter per stream just so that you can commit offsets
> >> >>with
> >> >> >>
> >> >> >> > at-least-once semantics. The downside of this approach is more
> >> >> duplicate
> >> >> >>
> >> >> >> > deliveries after recovery from hard failure (but this is "at
> >>least
> >> >> >> once",
> >> >> >>
> >> >> >> > right, not "exactly once").
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I don't propose that the code necessarily be changed like this
> >>in
> >> >> >> trunk, I
> >> >> >>
> >> >> >> > just want to know if the approach seems reasonable.
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > Regards
> >> >> >>
> >> >> >> > Carl Heymann
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --047d7bfcf30ed09b460518b241db--
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> >>
>
>

Re: At-least-once guarantees with high-level consumer

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Yes, your approach works. I am not sure if we should take this as default
solution, though. User can have a simple wrapper + customized rebalance
listener. The tricky part is that the rebalance listener might need
different implementations. So it looks the current API provides enough
simplicity and enough flexibility.

For the new consumer, if there is only one user thread, this might not be
a issue. If the consumer is shared by multiple threads (which is not
recommended), similar principle applies - commit offsets only after
processing them.

Thanks,

Jiangjie (Becket) Qin

On 6/21/15, 10:50 PM, "Carl Heymann" <ch...@gmail.com> wrote:

>Thanks Jiangjie
>
>So you agree that with the modified ConsumerIterator.next() code, the high
>level consumer becomes at-least-once, even with auto-commit enabled? That
>is what I really want to know.
>
>I'll have a look at the rebalancing code. I think I understand: during
>rebalancing, with auto-commit enabled, the offsets are committed
>in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing
>might still be happening at this point. The rebalance listener is called
>only after this commit. So the current code (without my change) would lead
>to fewer duplicate messages, because it assumes that these transactions
>normally complete. This seems prudent, since rebalancing happens much more
>frequently than java processes being killed unexpectedly. On the other
>hand
>it means giving up at-least-once guarantees for message processing, when a
>java process actually does die unexpectedly.
>
>So I see it should be better to create a custom offset tracking&commit
>component, with some ability to wait a reasonable amount of time for
>consumer threads on streams to complete their current transaction, on
>rebalance, before committing from a rebalance listener.
>
>Is it OK to block for a second or two
>in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
>processing threads to complete? Will this hold up the whole cluster's
>rebalancing?
>
>The new KafkaConsumer code doesn't appear to do a commit in the same way
>during rebalance, when autocommit is enabled. So if current users of the
>high level consumer switch to the new consumer, they might get more
>duplicates on rebalance, right?
>
>Regards
>Carl
>
>
>On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
>wrote:
>
>> Hi Carl,
>>
>> Generally, you approach works to guarantee at least once consumption -
>> basically people have to commit offset only after they have processed
>>the
>> message.
>> The only problem is that in old high level consumer, during consumer
>> rebalance consumer will (and should) commit offsets. To guarantee
>> at-least-once and avoid unecessary duplicates on rebalance, ideally we
>> should wait until all the messages returned by iterator to be processed
>> before commit offsets.
>>
>> In LinkedIn, we have wrapper around open source consumer iterator where
>>we
>> can implants those logics.
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/19/15, 12:22 AM, "Carl Heymann" <ch...@gmail.com> wrote:
>>
>> >Thanks Bhavesh.
>> >
>> >I understand that to get "exactly once" processing of a message
>>requires
>> >some de-duplication. What I'm saying, is that the current high level
>> >consumer, with automatic offset commits enabled, gives neither "at most
>> >once" nor "at least once" guarantees: A consumer group might get
>>duplicate
>> >messages, but might also never fully process some messages (which is a
>> >bigger problem for me).
>> >
>> >With the code change I propose, I think it changes to "at least once",
>> >i.e.
>> >one can then do the deduplication you describe, without worrying about
>> >"losing" messages. Messages should not get committed without being
>>fully
>> >processed. I want to know if this code change has any obvious problems.
>> >
>> >Regards
>> >Carl
>> >
>> >
>> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
>> ><mistry.p.bhavesh@gmail.com
>> >> wrote:
>> >
>> >> HI Carl,
>> >>
>> >> Produce side retry can produce duplicated message being sent to
>>brokers
>> >> with different offset with same message. Also, you may get duplicated
>> >>when
>> >> the High Level Consumer offset is not being saved or commit but you
>>have
>> >> processed data and your server restart etc...
>> >>
>> >>
>> >>
>> >> To guaranteed at-least one processing across partitions (and across
>> >> servers), you will need to store message hash or primary key into
>> >> distributed LRU cache (with eviction policy )  like Hazelcast
>> >> <http://www.hazelcast.com> and do dedupping across partitions.
>> >>
>> >>
>> >>
>> >> I hope this help !
>> >>
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Bhavesh
>> >>
>> >>
>> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
>> >>
>> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
>> >>reasonable?
>> >> >
>> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
>> >> >
>> >> >  --047d7bfcf30ed09b460518b241db
>> >> >>
>> >> >> Content-Type: text/plain; charset=UTF-8
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> With auto-commit one can only have at-most-once delivery
>>guarantee -
>> >> after
>> >> >>
>> >> >> commit but before message is delivered for processing, or even
>>after
>> >>it
>> >> is
>> >> >>
>> >> >> delivered but before it is processed, things can fail, causing
>>event
>> >>not
>> >> >> to
>> >> >>
>> >> >> be processed, which is basically same outcome as if it was not
>> >> delivered.
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann
>><ch...@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >>
>> >> >>
>> >> >> > Hi
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > ** Disclaimer: I know there's a new consumer API on the way,
>>this
>> >>mail
>> >> >> is
>> >> >>
>> >> >> > about the currently available API. I also apologise if the below
>> >>has
>> >> >>
>> >> >> > already been discussed previously. I did try to check previous
>> >> >> discussions
>> >> >>
>> >> >> > on ConsumerIterator **
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > It seems to me that the high-level consumer would be able to
>> >>support
>> >> >>
>> >> >> > at-least-once messaging, even if one uses auto-commit, by
>>changing
>> >> >>
>> >> >> > kafka.consumer.ConsumerIterator.next() to call
>> >> >>
>> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next().
>>This
>> >> >> way, a
>> >> >>
>> >> >> > consumer thread for a KafkaStream could just loop:
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > while (true) {
>> >> >>
>> >> >> >     MyMessage message = iterator.next().message();
>> >> >>
>> >> >> >     process(message);
>> >> >>
>> >> >> > }
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > Each call to "iterator.next()" then updates the offset to
>>commit to
>> >> the
>> >> >> end
>> >> >>
>> >> >> > of the message that was just processed. When offsets are
>>committed
>> >>for
>> >> >> the
>> >> >>
>> >> >> > ConsumerConnector (either automatically or manually), the commit
>> >>will
>> >> >> not
>> >> >>
>> >> >> > include offsets of messages that haven't been fully processed.
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > I've tested the following ConsumerIterator.next(), and it seems
>>to
>> >> work
>> >> >> as
>> >> >>
>> >> >> > I expect:
>> >> >>
>> >> >> >
>> >> >>
>> >> >> >   override def next(): MessageAndMetadata[K, V] = {
>> >> >>
>> >> >> >     // New code: reset consumer offset to the end of the
>>previously
>> >> >>
>> >> >> > consumed message:
>> >> >>
>> >> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
>> >> >>
>> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >> >>
>> >> >> >         val topic = currentTopicInfo.topic
>> >> >>
>> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
>> >> >>
>> >> >> > consumedOffset))
>> >> >>
>> >> >> >     }
>> >> >>
>> >> >> >
>> >> >>
>> >> >> >     // Old code, excluding reset:
>> >> >>
>> >> >> >     val item = super.next()
>> >> >>
>> >> >> >     if(consumedOffset < 0)
>> >> >>
>> >> >> >       throw new KafkaException("Offset returned by the message
>>set
>> >>is
>> >> >>
>> >> >> > invalid %d".format(consumedOffset))
>> >> >>
>> >> >> >     val topic = currentTopicInfo.topic
>> >> >>
>> >> >> >
>> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>> >> >>
>> >> >> >
>> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>> >> >>
>> >> >> >     item
>> >> >>
>> >> >> >   }
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > I've seen several people asking about managing commit offsets
>> >>manually
>> >> >> with
>> >> >>
>> >> >> > the high level consumer. I suspect that this approach (the
>>modified
>> >> >>
>> >> >> > ConsumerIterator) would scale better than having a separate
>> >> >>
>> >> >> > ConsumerConnecter per stream just so that you can commit offsets
>> >>with
>> >> >>
>> >> >> > at-least-once semantics. The downside of this approach is more
>> >> duplicate
>> >> >>
>> >> >> > deliveries after recovery from hard failure (but this is "at
>>least
>> >> >> once",
>> >> >>
>> >> >> > right, not "exactly once").
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > I don't propose that the code necessarily be changed like this
>>in
>> >> >> trunk, I
>> >> >>
>> >> >> > just want to know if the approach seems reasonable.
>> >> >>
>> >> >> >
>> >> >>
>> >> >> > Regards
>> >> >>
>> >> >> > Carl Heymann
>> >> >>
>> >> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >> --047d7bfcf30ed09b460518b241db--
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >
>> >> >
>> >> >
>> >>
>>
>>


Re: At-least-once guarantees with high-level consumer

Posted by Carl Heymann <ch...@gmail.com>.
Thanks Jiangjie

So you agree that with the modified ConsumerIterator.next() code, the high
level consumer becomes at-least-once, even with auto-commit enabled? That
is what I really want to know.

I'll have a look at the rebalancing code. I think I understand: during
rebalancing, with auto-commit enabled, the offsets are committed
in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing
might still be happening at this point. The rebalance listener is called
only after this commit. So the current code (without my change) would lead
to fewer duplicate messages, because it assumes that these transactions
normally complete. This seems prudent, since rebalancing happens much more
frequently than java processes being killed unexpectedly. On the other hand
it means giving up at-least-once guarantees for message processing, when a
java process actually does die unexpectedly.

So I see it should be better to create a custom offset tracking&commit
component, with some ability to wait a reasonable amount of time for
consumer threads on streams to complete their current transaction, on
rebalance, before committing from a rebalance listener.

Is it OK to block for a second or two
in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
processing threads to complete? Will this hold up the whole cluster's
rebalancing?

The new KafkaConsumer code doesn't appear to do a commit in the same way
during rebalance, when autocommit is enabled. So if current users of the
high level consumer switch to the new consumer, they might get more
duplicates on rebalance, right?

Regards
Carl


On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> Hi Carl,
>
> Generally, you approach works to guarantee at least once consumption -
> basically people have to commit offset only after they have processed the
> message.
> The only problem is that in old high level consumer, during consumer
> rebalance consumer will (and should) commit offsets. To guarantee
> at-least-once and avoid unecessary duplicates on rebalance, ideally we
> should wait until all the messages returned by iterator to be processed
> before commit offsets.
>
> In LinkedIn, we have wrapper around open source consumer iterator where we
> can implants those logics.
>
> Jiangjie (Becket) Qin
>
> On 6/19/15, 12:22 AM, "Carl Heymann" <ch...@gmail.com> wrote:
>
> >Thanks Bhavesh.
> >
> >I understand that to get "exactly once" processing of a message requires
> >some de-duplication. What I'm saying, is that the current high level
> >consumer, with automatic offset commits enabled, gives neither "at most
> >once" nor "at least once" guarantees: A consumer group might get duplicate
> >messages, but might also never fully process some messages (which is a
> >bigger problem for me).
> >
> >With the code change I propose, I think it changes to "at least once",
> >i.e.
> >one can then do the deduplication you describe, without worrying about
> >"losing" messages. Messages should not get committed without being fully
> >processed. I want to know if this code change has any obvious problems.
> >
> >Regards
> >Carl
> >
> >
> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
> ><mistry.p.bhavesh@gmail.com
> >> wrote:
> >
> >> HI Carl,
> >>
> >> Produce side retry can produce duplicated message being sent to brokers
> >> with different offset with same message. Also, you may get duplicated
> >>when
> >> the High Level Consumer offset is not being saved or commit but you have
> >> processed data and your server restart etc...
> >>
> >>
> >>
> >> To guaranteed at-least one processing across partitions (and across
> >> servers), you will need to store message hash or primary key into
> >> distributed LRU cache (with eviction policy )  like Hazelcast
> >> <http://www.hazelcast.com> and do dedupping across partitions.
> >>
> >>
> >>
> >> I hope this help !
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >>
> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
> >>
> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
> >>reasonable?
> >> >
> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> >> >
> >> >  --047d7bfcf30ed09b460518b241db
> >> >>
> >> >> Content-Type: text/plain; charset=UTF-8
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> With auto-commit one can only have at-most-once delivery guarantee -
> >> after
> >> >>
> >> >> commit but before message is delivered for processing, or even after
> >>it
> >> is
> >> >>
> >> >> delivered but before it is processed, things can fail, causing event
> >>not
> >> >> to
> >> >>
> >> >> be processed, which is basically same outcome as if it was not
> >> delivered.
> >> >>
> >> >>
> >> >>
> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch...@gmail.com>
> >> >> wrote:
> >> >>
> >> >>
> >> >>
> >> >> > Hi
> >> >>
> >> >> >
> >> >>
> >> >> > ** Disclaimer: I know there's a new consumer API on the way, this
> >>mail
> >> >> is
> >> >>
> >> >> > about the currently available API. I also apologise if the below
> >>has
> >> >>
> >> >> > already been discussed previously. I did try to check previous
> >> >> discussions
> >> >>
> >> >> > on ConsumerIterator **
> >> >>
> >> >> >
> >> >>
> >> >> > It seems to me that the high-level consumer would be able to
> >>support
> >> >>
> >> >> > at-least-once messaging, even if one uses auto-commit, by changing
> >> >>
> >> >> > kafka.consumer.ConsumerIterator.next() to call
> >> >>
> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
> >> >> way, a
> >> >>
> >> >> > consumer thread for a KafkaStream could just loop:
> >> >>
> >> >> >
> >> >>
> >> >> > while (true) {
> >> >>
> >> >> >     MyMessage message = iterator.next().message();
> >> >>
> >> >> >     process(message);
> >> >>
> >> >> > }
> >> >>
> >> >> >
> >> >>
> >> >> > Each call to "iterator.next()" then updates the offset to commit to
> >> the
> >> >> end
> >> >>
> >> >> > of the message that was just processed. When offsets are committed
> >>for
> >> >> the
> >> >>
> >> >> > ConsumerConnector (either automatically or manually), the commit
> >>will
> >> >> not
> >> >>
> >> >> > include offsets of messages that haven't been fully processed.
> >> >>
> >> >> >
> >> >>
> >> >> > I've tested the following ConsumerIterator.next(), and it seems to
> >> work
> >> >> as
> >> >>
> >> >> > I expect:
> >> >>
> >> >> >
> >> >>
> >> >> >   override def next(): MessageAndMetadata[K, V] = {
> >> >>
> >> >> >     // New code: reset consumer offset to the end of the previously
> >> >>
> >> >> > consumed message:
> >> >>
> >> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> >> >>
> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> >> >>
> >> >> >         val topic = currentTopicInfo.topic
> >> >>
> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
> >> >>
> >> >> > consumedOffset))
> >> >>
> >> >> >     }
> >> >>
> >> >> >
> >> >>
> >> >> >     // Old code, excluding reset:
> >> >>
> >> >> >     val item = super.next()
> >> >>
> >> >> >     if(consumedOffset < 0)
> >> >>
> >> >> >       throw new KafkaException("Offset returned by the message set
> >>is
> >> >>
> >> >> > invalid %d".format(consumedOffset))
> >> >>
> >> >> >     val topic = currentTopicInfo.topic
> >> >>
> >> >> >
> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> >> >>
> >> >> >
> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> >> >>
> >> >> >     item
> >> >>
> >> >> >   }
> >> >>
> >> >> >
> >> >>
> >> >> > I've seen several people asking about managing commit offsets
> >>manually
> >> >> with
> >> >>
> >> >> > the high level consumer. I suspect that this approach (the modified
> >> >>
> >> >> > ConsumerIterator) would scale better than having a separate
> >> >>
> >> >> > ConsumerConnecter per stream just so that you can commit offsets
> >>with
> >> >>
> >> >> > at-least-once semantics. The downside of this approach is more
> >> duplicate
> >> >>
> >> >> > deliveries after recovery from hard failure (but this is "at least
> >> >> once",
> >> >>
> >> >> > right, not "exactly once").
> >> >>
> >> >> >
> >> >>
> >> >> > I don't propose that the code necessarily be changed like this in
> >> >> trunk, I
> >> >>
> >> >> > just want to know if the approach seems reasonable.
> >> >>
> >> >> >
> >> >>
> >> >> > Regards
> >> >>
> >> >> > Carl Heymann
> >> >>
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --047d7bfcf30ed09b460518b241db--
> >> >>
> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> >
> >>
>
>

Re: At-least-once guarantees with high-level consumer

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hi Carl,

Generally, you approach works to guarantee at least once consumption -
basically people have to commit offset only after they have processed the
message. 
The only problem is that in old high level consumer, during consumer
rebalance consumer will (and should) commit offsets. To guarantee
at-least-once and avoid unecessary duplicates on rebalance, ideally we
should wait until all the messages returned by iterator to be processed
before commit offsets.

In LinkedIn, we have wrapper around open source consumer iterator where we
can implants those logics.

Jiangjie (Becket) Qin

On 6/19/15, 12:22 AM, "Carl Heymann" <ch...@gmail.com> wrote:

>Thanks Bhavesh.
>
>I understand that to get "exactly once" processing of a message requires
>some de-duplication. What I'm saying, is that the current high level
>consumer, with automatic offset commits enabled, gives neither "at most
>once" nor "at least once" guarantees: A consumer group might get duplicate
>messages, but might also never fully process some messages (which is a
>bigger problem for me).
>
>With the code change I propose, I think it changes to "at least once",
>i.e.
>one can then do the deduplication you describe, without worrying about
>"losing" messages. Messages should not get committed without being fully
>processed. I want to know if this code change has any obvious problems.
>
>Regards
>Carl
>
>
>On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
><mistry.p.bhavesh@gmail.com
>> wrote:
>
>> HI Carl,
>>
>> Produce side retry can produce duplicated message being sent to brokers
>> with different offset with same message. Also, you may get duplicated
>>when
>> the High Level Consumer offset is not being saved or commit but you have
>> processed data and your server restart etc...
>>
>>
>>
>> To guaranteed at-least one processing across partitions (and across
>> servers), you will need to store message hash or primary key into
>> distributed LRU cache (with eviction policy )  like Hazelcast
>> <http://www.hazelcast.com> and do dedupping across partitions.
>>
>>
>>
>> I hope this help !
>>
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
>>
>> > So Carl Heymann's ConsumerIterator.next hack approach is not
>>reasonable?
>> >
>> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
>> >
>> >  --047d7bfcf30ed09b460518b241db
>> >>
>> >> Content-Type: text/plain; charset=UTF-8
>> >>
>> >>
>> >>
>> >>
>> >> With auto-commit one can only have at-most-once delivery guarantee -
>> after
>> >>
>> >> commit but before message is delivered for processing, or even after
>>it
>> is
>> >>
>> >> delivered but before it is processed, things can fail, causing event
>>not
>> >> to
>> >>
>> >> be processed, which is basically same outcome as if it was not
>> delivered.
>> >>
>> >>
>> >>
>> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch...@gmail.com>
>> >> wrote:
>> >>
>> >>
>> >>
>> >> > Hi
>> >>
>> >> >
>> >>
>> >> > ** Disclaimer: I know there's a new consumer API on the way, this
>>mail
>> >> is
>> >>
>> >> > about the currently available API. I also apologise if the below
>>has
>> >>
>> >> > already been discussed previously. I did try to check previous
>> >> discussions
>> >>
>> >> > on ConsumerIterator **
>> >>
>> >> >
>> >>
>> >> > It seems to me that the high-level consumer would be able to
>>support
>> >>
>> >> > at-least-once messaging, even if one uses auto-commit, by changing
>> >>
>> >> > kafka.consumer.ConsumerIterator.next() to call
>> >>
>> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> >> way, a
>> >>
>> >> > consumer thread for a KafkaStream could just loop:
>> >>
>> >> >
>> >>
>> >> > while (true) {
>> >>
>> >> >     MyMessage message = iterator.next().message();
>> >>
>> >> >     process(message);
>> >>
>> >> > }
>> >>
>> >> >
>> >>
>> >> > Each call to "iterator.next()" then updates the offset to commit to
>> the
>> >> end
>> >>
>> >> > of the message that was just processed. When offsets are committed
>>for
>> >> the
>> >>
>> >> > ConsumerConnector (either automatically or manually), the commit
>>will
>> >> not
>> >>
>> >> > include offsets of messages that haven't been fully processed.
>> >>
>> >> >
>> >>
>> >> > I've tested the following ConsumerIterator.next(), and it seems to
>> work
>> >> as
>> >>
>> >> > I expect:
>> >>
>> >> >
>> >>
>> >> >   override def next(): MessageAndMetadata[K, V] = {
>> >>
>> >> >     // New code: reset consumer offset to the end of the previously
>> >>
>> >> > consumed message:
>> >>
>> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
>> >>
>> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >>
>> >> >         val topic = currentTopicInfo.topic
>> >>
>> >> >         trace("Setting %s consumed offset to %d".format(topic,
>> >>
>> >> > consumedOffset))
>> >>
>> >> >     }
>> >>
>> >> >
>> >>
>> >> >     // Old code, excluding reset:
>> >>
>> >> >     val item = super.next()
>> >>
>> >> >     if(consumedOffset < 0)
>> >>
>> >> >       throw new KafkaException("Offset returned by the message set
>>is
>> >>
>> >> > invalid %d".format(consumedOffset))
>> >>
>> >> >     val topic = currentTopicInfo.topic
>> >>
>> >> >     
>>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>> >>
>> >> >     
>>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>> >>
>> >> >     item
>> >>
>> >> >   }
>> >>
>> >> >
>> >>
>> >> > I've seen several people asking about managing commit offsets
>>manually
>> >> with
>> >>
>> >> > the high level consumer. I suspect that this approach (the modified
>> >>
>> >> > ConsumerIterator) would scale better than having a separate
>> >>
>> >> > ConsumerConnecter per stream just so that you can commit offsets
>>with
>> >>
>> >> > at-least-once semantics. The downside of this approach is more
>> duplicate
>> >>
>> >> > deliveries after recovery from hard failure (but this is "at least
>> >> once",
>> >>
>> >> > right, not "exactly once").
>> >>
>> >> >
>> >>
>> >> > I don't propose that the code necessarily be changed like this in
>> >> trunk, I
>> >>
>> >> > just want to know if the approach seems reasonable.
>> >>
>> >> >
>> >>
>> >> > Regards
>> >>
>> >> > Carl Heymann
>> >>
>> >> >
>> >>
>> >>
>> >>
>> >> --047d7bfcf30ed09b460518b241db--
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >
>>


Re: At-least-once guarantees with high-level consumer

Posted by Carl Heymann <ch...@gmail.com>.
Thanks Bhavesh.

I understand that to get "exactly once" processing of a message requires
some de-duplication. What I'm saying, is that the current high level
consumer, with automatic offset commits enabled, gives neither "at most
once" nor "at least once" guarantees: A consumer group might get duplicate
messages, but might also never fully process some messages (which is a
bigger problem for me).

With the code change I propose, I think it changes to "at least once", i.e.
one can then do the deduplication you describe, without worrying about
"losing" messages. Messages should not get committed without being fully
processed. I want to know if this code change has any obvious problems.

Regards
Carl


On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> wrote:

> HI Carl,
>
> Produce side retry can produce duplicated message being sent to brokers
> with different offset with same message. Also, you may get duplicated when
> the High Level Consumer offset is not being saved or commit but you have
> processed data and your server restart etc...
>
>
>
> To guaranteed at-least one processing across partitions (and across
> servers), you will need to store message hash or primary key into
> distributed LRU cache (with eviction policy )  like Hazelcast
> <http://www.hazelcast.com> and do dedupping across partitions.
>
>
>
> I hope this help !
>
>
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:
>
> > So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?
> >
> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> >
> >  --047d7bfcf30ed09b460518b241db
> >>
> >> Content-Type: text/plain; charset=UTF-8
> >>
> >>
> >>
> >>
> >> With auto-commit one can only have at-most-once delivery guarantee -
> after
> >>
> >> commit but before message is delivered for processing, or even after it
> is
> >>
> >> delivered but before it is processed, things can fail, causing event not
> >> to
> >>
> >> be processed, which is basically same outcome as if it was not
> delivered.
> >>
> >>
> >>
> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch...@gmail.com>
> >> wrote:
> >>
> >>
> >>
> >> > Hi
> >>
> >> >
> >>
> >> > ** Disclaimer: I know there's a new consumer API on the way, this mail
> >> is
> >>
> >> > about the currently available API. I also apologise if the below has
> >>
> >> > already been discussed previously. I did try to check previous
> >> discussions
> >>
> >> > on ConsumerIterator **
> >>
> >> >
> >>
> >> > It seems to me that the high-level consumer would be able to support
> >>
> >> > at-least-once messaging, even if one uses auto-commit, by changing
> >>
> >> > kafka.consumer.ConsumerIterator.next() to call
> >>
> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
> >> way, a
> >>
> >> > consumer thread for a KafkaStream could just loop:
> >>
> >> >
> >>
> >> > while (true) {
> >>
> >> >     MyMessage message = iterator.next().message();
> >>
> >> >     process(message);
> >>
> >> > }
> >>
> >> >
> >>
> >> > Each call to "iterator.next()" then updates the offset to commit to
> the
> >> end
> >>
> >> > of the message that was just processed. When offsets are committed for
> >> the
> >>
> >> > ConsumerConnector (either automatically or manually), the commit will
> >> not
> >>
> >> > include offsets of messages that haven't been fully processed.
> >>
> >> >
> >>
> >> > I've tested the following ConsumerIterator.next(), and it seems to
> work
> >> as
> >>
> >> > I expect:
> >>
> >> >
> >>
> >> >   override def next(): MessageAndMetadata[K, V] = {
> >>
> >> >     // New code: reset consumer offset to the end of the previously
> >>
> >> > consumed message:
> >>
> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> >>
> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> >>
> >> >         val topic = currentTopicInfo.topic
> >>
> >> >         trace("Setting %s consumed offset to %d".format(topic,
> >>
> >> > consumedOffset))
> >>
> >> >     }
> >>
> >> >
> >>
> >> >     // Old code, excluding reset:
> >>
> >> >     val item = super.next()
> >>
> >> >     if(consumedOffset < 0)
> >>
> >> >       throw new KafkaException("Offset returned by the message set is
> >>
> >> > invalid %d".format(consumedOffset))
> >>
> >> >     val topic = currentTopicInfo.topic
> >>
> >> >     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> >>
> >> >     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> >>
> >> >     item
> >>
> >> >   }
> >>
> >> >
> >>
> >> > I've seen several people asking about managing commit offsets manually
> >> with
> >>
> >> > the high level consumer. I suspect that this approach (the modified
> >>
> >> > ConsumerIterator) would scale better than having a separate
> >>
> >> > ConsumerConnecter per stream just so that you can commit offsets with
> >>
> >> > at-least-once semantics. The downside of this approach is more
> duplicate
> >>
> >> > deliveries after recovery from hard failure (but this is "at least
> >> once",
> >>
> >> > right, not "exactly once").
> >>
> >> >
> >>
> >> > I don't propose that the code necessarily be changed like this in
> >> trunk, I
> >>
> >> > just want to know if the approach seems reasonable.
> >>
> >> >
> >>
> >> > Regards
> >>
> >> > Carl Heymann
> >>
> >> >
> >>
> >>
> >>
> >> --047d7bfcf30ed09b460518b241db--
> >>
> >>
> >>
> >>
> >
> >
> >
>

Re: At-least-once guarantees with high-level consumer

Posted by Bhavesh Mistry <mi...@gmail.com>.
HI Carl,

Produce side retry can produce duplicated message being sent to brokers
with different offset with same message. Also, you may get duplicated when
the High Level Consumer offset is not being saved or commit but you have
processed data and your server restart etc...



To guaranteed at-least one processing across partitions (and across
servers), you will need to store message hash or primary key into
distributed LRU cache (with eviction policy )  like Hazelcast
<http://www.hazelcast.com> and do dedupping across partitions.



I hope this help !



Thanks,

Bhavesh


On Wed, Jun 17, 2015 at 1:49 AM, yewton <ye...@gmail.com> wrote:

> So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?
>
> 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
>
>  --047d7bfcf30ed09b460518b241db
>>
>> Content-Type: text/plain; charset=UTF-8
>>
>>
>>
>>
>> With auto-commit one can only have at-most-once delivery guarantee - after
>>
>> commit but before message is delivered for processing, or even after it is
>>
>> delivered but before it is processed, things can fail, causing event not
>> to
>>
>> be processed, which is basically same outcome as if it was not delivered.
>>
>>
>>
>> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch...@gmail.com>
>> wrote:
>>
>>
>>
>> > Hi
>>
>> >
>>
>> > ** Disclaimer: I know there's a new consumer API on the way, this mail
>> is
>>
>> > about the currently available API. I also apologise if the below has
>>
>> > already been discussed previously. I did try to check previous
>> discussions
>>
>> > on ConsumerIterator **
>>
>> >
>>
>> > It seems to me that the high-level consumer would be able to support
>>
>> > at-least-once messaging, even if one uses auto-commit, by changing
>>
>> > kafka.consumer.ConsumerIterator.next() to call
>>
>> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> way, a
>>
>> > consumer thread for a KafkaStream could just loop:
>>
>> >
>>
>> > while (true) {
>>
>> >     MyMessage message = iterator.next().message();
>>
>> >     process(message);
>>
>> > }
>>
>> >
>>
>> > Each call to "iterator.next()" then updates the offset to commit to the
>> end
>>
>> > of the message that was just processed. When offsets are committed for
>> the
>>
>> > ConsumerConnector (either automatically or manually), the commit will
>> not
>>
>> > include offsets of messages that haven't been fully processed.
>>
>> >
>>
>> > I've tested the following ConsumerIterator.next(), and it seems to work
>> as
>>
>> > I expect:
>>
>> >
>>
>> >   override def next(): MessageAndMetadata[K, V] = {
>>
>> >     // New code: reset consumer offset to the end of the previously
>>
>> > consumed message:
>>
>> >     if (consumedOffset > -1L && currentTopicInfo != null) {
>>
>> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
>>
>> >         val topic = currentTopicInfo.topic
>>
>> >         trace("Setting %s consumed offset to %d".format(topic,
>>
>> > consumedOffset))
>>
>> >     }
>>
>> >
>>
>> >     // Old code, excluding reset:
>>
>> >     val item = super.next()
>>
>> >     if(consumedOffset < 0)
>>
>> >       throw new KafkaException("Offset returned by the message set is
>>
>> > invalid %d".format(consumedOffset))
>>
>> >     val topic = currentTopicInfo.topic
>>
>> >     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>>
>> >     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>>
>> >     item
>>
>> >   }
>>
>> >
>>
>> > I've seen several people asking about managing commit offsets manually
>> with
>>
>> > the high level consumer. I suspect that this approach (the modified
>>
>> > ConsumerIterator) would scale better than having a separate
>>
>> > ConsumerConnecter per stream just so that you can commit offsets with
>>
>> > at-least-once semantics. The downside of this approach is more duplicate
>>
>> > deliveries after recovery from hard failure (but this is "at least
>> once",
>>
>> > right, not "exactly once").
>>
>> >
>>
>> > I don't propose that the code necessarily be changed like this in
>> trunk, I
>>
>> > just want to know if the approach seems reasonable.
>>
>> >
>>
>> > Regards
>>
>> > Carl Heymann
>>
>> >
>>
>>
>>
>> --047d7bfcf30ed09b460518b241db--
>>
>>
>>
>>
>
>
>

Re: At-least-once guarantees with high-level consumer

Posted by yewton <ye...@gmail.com>.
So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?

2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:

> --047d7bfcf30ed09b460518b241db
> 
> Content-Type: text/plain; charset=UTF-8
> 
> 
> 
> With auto-commit one can only have at-most-once delivery guarantee - after
> 
> commit but before message is delivered for processing, or even after it is
> 
> delivered but before it is processed, things can fail, causing event not to
> 
> be processed, which is basically same outcome as if it was not delivered.
> 
> 
> 
> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann 
> <ch...@gmail.com> wrote:
> 
> 
> 
> > Hi
> 
> >
> 
> > ** Disclaimer: I know there's a new consumer API on the way, this mail is
> 
> > about the currently available API. I also apologise if the below has
> 
> > already been discussed previously. I did try to check previous discussions
> 
> > on ConsumerIterator **
> 
> >
> 
> > It seems to me that the high-level consumer would be able to support
> 
> > at-least-once messaging, even if one uses auto-commit, by changing
> 
> > kafka.consumer.ConsumerIterator.next() to call
> 
> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
> 
> > consumer thread for a KafkaStream could just loop:
> 
> >
> 
> > while (true) {
> 
> >     MyMessage message = iterator.next().message();
> 
> >     process(message);
> 
> > }
> 
> >
> 
> > Each call to "iterator.next()" then updates the offset to commit to the end
> 
> > of the message that was just processed. When offsets are committed for the
> 
> > ConsumerConnector (either automatically or manually), the commit will not
> 
> > include offsets of messages that haven't been fully processed.
> 
> >
> 
> > I've tested the following ConsumerIterator.next(), and it seems to work as
> 
> > I expect:
> 
> >
> 
> >   override def next(): MessageAndMetadata[K, V] = {
> 
> >     // New code: reset consumer offset to the end of the previously
> 
> > consumed message:
> 
> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> 
> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> 
> >         val topic = currentTopicInfo.topic
> 
> >         trace("Setting %s consumed offset to %d".format(topic,
> 
> > consumedOffset))
> 
> >     }
> 
> >
> 
> >     // Old code, excluding reset:
> 
> >     val item = super.next()
> 
> >     if(consumedOffset < 0)
> 
> >       throw new KafkaException("Offset returned by the message set is
> 
> > invalid %d".format(consumedOffset))
> 
> >     val topic = currentTopicInfo.topic
> 
> >     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> 
> >     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> 
> >     item
> 
> >   }
> 
> >
> 
> > I've seen several people asking about managing commit offsets manually with
> 
> > the high level consumer. I suspect that this approach (the modified
> 
> > ConsumerIterator) would scale better than having a separate
> 
> > ConsumerConnecter per stream just so that you can commit offsets with
> 
> > at-least-once semantics. The downside of this approach is more duplicate
> 
> > deliveries after recovery from hard failure (but this is "at least once",
> 
> > right, not "exactly once").
> 
> >
> 
> > I don't propose that the code necessarily be changed like this in trunk, I
> 
> > just want to know if the approach seems reasonable.
> 
> >
> 
> > Regards
> 
> > Carl Heymann
> 
> >
> 
> 
> 
> --047d7bfcf30ed09b460518b241db--
> 
> 
> 




Re: At-least-once guarantees with high-level consumer

Posted by Stevo Slavić <ss...@gmail.com>.
With auto-commit one can only have at-most-once delivery guarantee - after
commit but before message is delivered for processing, or even after it is
delivered but before it is processed, things can fail, causing event not to
be processed, which is basically same outcome as if it was not delivered.

On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch...@gmail.com> wrote:

> Hi
>
> ** Disclaimer: I know there's a new consumer API on the way, this mail is
> about the currently available API. I also apologise if the below has
> already been discussed previously. I did try to check previous discussions
> on ConsumerIterator **
>
> It seems to me that the high-level consumer would be able to support
> at-least-once messaging, even if one uses auto-commit, by changing
> kafka.consumer.ConsumerIterator.next() to call
> currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
> consumer thread for a KafkaStream could just loop:
>
> while (true) {
>     MyMessage message = iterator.next().message();
>     process(message);
> }
>
> Each call to "iterator.next()" then updates the offset to commit to the end
> of the message that was just processed. When offsets are committed for the
> ConsumerConnector (either automatically or manually), the commit will not
> include offsets of messages that haven't been fully processed.
>
> I've tested the following ConsumerIterator.next(), and it seems to work as
> I expect:
>
>   override def next(): MessageAndMetadata[K, V] = {
>     // New code: reset consumer offset to the end of the previously
> consumed message:
>     if (consumedOffset > -1L && currentTopicInfo != null) {
>         currentTopicInfo.resetConsumeOffset(consumedOffset)
>         val topic = currentTopicInfo.topic
>         trace("Setting %s consumed offset to %d".format(topic,
> consumedOffset))
>     }
>
>     // Old code, excluding reset:
>     val item = super.next()
>     if(consumedOffset < 0)
>       throw new KafkaException("Offset returned by the message set is
> invalid %d".format(consumedOffset))
>     val topic = currentTopicInfo.topic
>     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>     item
>   }
>
> I've seen several people asking about managing commit offsets manually with
> the high level consumer. I suspect that this approach (the modified
> ConsumerIterator) would scale better than having a separate
> ConsumerConnecter per stream just so that you can commit offsets with
> at-least-once semantics. The downside of this approach is more duplicate
> deliveries after recovery from hard failure (but this is "at least once",
> right, not "exactly once").
>
> I don't propose that the code necessarily be changed like this in trunk, I
> just want to know if the approach seems reasonable.
>
> Regards
> Carl Heymann
>