You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Apurva Mehta <ap...@confluent.io> on 2017/09/07 20:51:43 UTC

[VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Hi,

I'd like to start a vote for KIP-192:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled

Thanks,
Apurva

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
Thanks for the votes!

Jason: I updated the KIP so that the messageFormatVersion field is int8.

Guozhang: The type of the config field is a purely internal concept. The
public API of the KafkaProducer takes a Map<String, Object> type or a
Properties type (which is a map of String to String). So this change in the
type doesn't change anything from an external point of view.

Thanks,
Apurva

On Thu, Sep 7, 2017 at 2:45 PM, Guozhang Wang <wa...@gmail.com> wrote:

> +1.
>
> A quick clarification question regarding the compatibility plan as for "The
> legacy values for `enable.idempotence` will be interpreted as follows by
> the new producer: true will mean required, false will mean off."
>
> Right now "enable.idempotence" is defined as Type.BOOLEAN while we are
> likely to change it to Type.STRING, is that considered compatible?
>
>
> Guozhang
>
>
> On Thu, Sep 7, 2017 at 2:25 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > +1. Thanks for the KIP. One nit: we could use int8 to represent the
> message
> > format version. That is how it is represented in the messages themselves.
> >
> > -Jason
> >
> > On Thu, Sep 7, 2017 at 1:51 PM, Apurva Mehta <ap...@confluent.io>
> wrote:
> >
> > > Hi,
> > >
> > > I'd like to start a vote for KIP-192:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> > >
> > > Thanks,
> > > Apurva
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Guozhang Wang <wa...@gmail.com>.
+1.

A quick clarification question regarding the compatibility plan as for "The
legacy values for `enable.idempotence` will be interpreted as follows by
the new producer: true will mean required, false will mean off."

Right now "enable.idempotence" is defined as Type.BOOLEAN while we are
likely to change it to Type.STRING, is that considered compatible?


Guozhang


On Thu, Sep 7, 2017 at 2:25 PM, Jason Gustafson <ja...@confluent.io> wrote:

> +1. Thanks for the KIP. One nit: we could use int8 to represent the message
> format version. That is how it is represented in the messages themselves.
>
> -Jason
>
> On Thu, Sep 7, 2017 at 1:51 PM, Apurva Mehta <ap...@confluent.io> wrote:
>
> > Hi,
> >
> > I'd like to start a vote for KIP-192:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> >
> > Thanks,
> > Apurva
> >
>



-- 
-- Guozhang

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Jason Gustafson <ja...@confluent.io>.
+1. Thanks for the KIP. One nit: we could use int8 to represent the message
format version. That is how it is represented in the messages themselves.

-Jason

On Thu, Sep 7, 2017 at 1:51 PM, Apurva Mehta <ap...@confluent.io> wrote:

> Hi,
>
> I'd like to start a vote for KIP-192:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
>
> Thanks,
> Apurva
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
The KIP has passed with three binding +1 votes (Guozhang, Ismael, Jason)
and no -1 or +0 votes.

Thanks to everyone for the feedback.

Apurva

On Mon, Sep 11, 2017 at 8:31 PM, Apurva Mehta <ap...@confluent.io> wrote:

> Hi Becket,
>
> You are right: the calculations are per partition produced to by each
> idempotent producer. I actually think this makes the problem more acute
> when we actually end up enabling the idempotent producer by default.
> However, even the most optimized version will still result in an overhead
> of at least 46 bytes per (producer, broker, partition) triplet.
>
> I will call this out in KIP-185. I think we would want to optimize the
> memory utilization of each (partition, broker, producer) triplet before
> turning on the default. Another option could be to default max.in.flight to
> 2 and retain the metadata of just 2 batches. This would significantly
> reduce the memory overhead in the default distribution, and in the most
> common cases would not result in produce responses without the record
> metadata. That may be a simple way to address the issue.
> Thanks,
> Apurva
>
> On Mon, Sep 11, 2017 at 7:40 PM, Becket Qin <be...@gmail.com> wrote:
>
>> Hi Apurva,
>>
>> Thanks for the explanation.
>>
>> I think the STO will be per producer/partition, right? Am I missing
>> something? You are right that the proposal does not strengthen the
>> semantic. The goal is more about trying to bound the memory consumption to
>> some reasonable number and use the memory more efficiently.
>>
>> But I agree it is not a blocker for this KIP as the memory pressure may
>> not
>> be that big. With 5K partitions per broker, 50 producers per partition on
>> average, we are going to consume about 35 MB of memory with 5 entries (142
>> bytes) in cache. So it is probably still OK and it is more urgent to fix
>> the upgrade path.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>>
>>
>> On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <ap...@confluent.io>
>> wrote:
>>
>> > Hi Becket,
>> >
>> > Regarding the current implementation: we opted for a simpler server side
>> > implementation where we _don't_ snapshot the metadata of the last 5
>> batches
>> > to disk. So if a broker fails, comes back online, and is the leader
>> again,
>> > it will only have the last batch in memory. With max.in.flight = 5, it
>> is
>> > thus possible to receive duplicates of 4 prior batches and yet not have
>> the
>> > metadata.
>> >
>> > Since we can return DuplicateSequence to the client, and since this is
>> > considered a successful response on the client, this is a good
>> solution. It
>> > also means that we no longer have a hard limit of max.in.flight == 5: if
>> > you use a larger value, you are more likely to receive responses without
>> > the offset/timestamp metadata.
>> >
>> > Your suggestion for sending the lastAckdSequence per partition would
>> help
>> > reduce memory on the broker, but it won't bound it: we need at least one
>> > cached batch per producer to do sequence number validation so it will
>> > always grow proportional to the number of active producers. Nor does it
>> > uniquely solve the problem of removing the cap on max.in.flight:
>> producers
>> > are still not guaranteed that the metadata for all their inflight
>> batches
>> > will always be cached and returned.
>> >
>> > So it doesn't strengthen any semantics, but does optimize memory usage
>> on
>> > the broker. But what's the usage with the proposed changes? With 5
>> cached
>> > batches, each producerIdEntry will be 142 bytes, or 7000 active
>> producers
>> > who use idempotence will take 1MB per broker. With 1 cached batch, we
>> save
>> > at most 96 bytes per producer, so we could have at most 21000 active
>> > producers using idempotence per MB of broker memory.
>> >
>> > The savings are significant, but I am not convinced optimizing this is
>> > worth it right now since we are not making the idempotent producer the
>> > default in this release. Further there are a bunch of other items which
>> > make exactly once semantics more usable which we are working on in this
>> > release. Given that there are only 8 days left till feature freeze, I
>> would
>> > rather tackle the usability issues (KIP-192) than make these memory
>> > optimizations on the broker. The payoff from the latter seems much
>> smaller,
>> > and it is always possible to do in the future.
>> >
>> > What does the rest of the community think?
>> >
>> > Thanks,
>> > Apurva
>> >
>> > On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <be...@gmail.com>
>> wrote:
>> >
>> > > Hi Apurva,
>> > >
>> > > Sorry for being late on this thread. I am trying to understand the
>> > > implementation of case that we will throw DuplicateSequenceException.
>> My
>> > > understanding is the following:
>> > > 1. On the broker side, we will cache 5 most recent
>> > > sequence/timestamp/offset (STO) for each of the producer ID.
>> > > 2. When duplicate occurs and the producer has max.in.flight.requests
>> set
>> > to
>> > > <=5, we can return the timestamp/offset from the cached STO.
>> > > 3. When duplicate occurs and the producer has max.in.flight.requests
>> > > greater than 5. We may need to return DuplicateSequenceException just
>> to
>> > > indicate the sequence is duplicate, but the ProduceResponse will not
>> have
>> > > timestamp/offset because it may be out of the 5 entries in the cache.
>> > >
>> > > Is my understanding correct? If it is the current implementation, I
>> have
>> > a
>> > > few concerns:
>> > > 1. One potential issue for this is that if there are a lot of
>> producers
>> > > producing to the same cluster (e.g. a Map-Reduce job) we may still
>> spend
>> > a
>> > > lot of memory on caching the most recent STO.
>> > > 2. In most cases, we are essentially caching the
>> sequnce/timestamp/offset
>> > > entries that may never be used again.
>> > >
>> > > Since we are making protocol changes, I am wondering if we can improve
>> > the
>> > > above two cases by doing the following:
>> > > 1. Add a per partition LastAckedSequence field in the ProduceRequest.
>> > > 2. The broker will remove the cached STO entries whose sequence is
>> less
>> > > than or equals to LastAckedSequence for each Partition/PID.
>> > > 3. Have a global STO cache to cap the number of total cached STO
>> entries
>> > to
>> > > some number, say 1 million. This total number is shared by all the
>> > > producers. If this number is reached, we will remove the entries from
>> the
>> > > producer who has the most cached STO entries.
>> > > 4. If there is a sequence smaller than the last sequence and there is
>> no
>> > > entry in the STO cache, we return DuplicateSequenceException without
>> > > offset/timestamp.
>> > >
>> > > With the above changes, we can have the following benefits:
>> > > 1. avoid caching the sequence/timestamp/offset unnecessarily because
>> all
>> > > the cached entries are the entries that hasn't been confirmed by the
>> > > producer.
>> > > 2. no magic number of 5 max.in.flight.requests.per.connection
>> > > 3. bounded memory footprint on the cached sequence/timestamp/offset
>> > > entries.
>> > >
>> > > Hope it's not too late to have the changes if that makes sense.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <ap...@confluent.io>
>> > > wrote:
>> > >
>> > > > Thanks for the votes everyone.
>> > > >
>> > > > One of the proposals here was to raise a
>> 'DuplicateSequenceException'
>> > to
>> > > > the user if the broker detected that one of the internal retries
>> > resulted
>> > > > in the duplicate, and the metadata for the original batch was no
>> longer
>> > > > cached.
>> > > >
>> > > > However, when implementing this change, I realized that this is
>> quite
>> > > > unintuitive from the user's point of view. In reality, the
>> 'duplicate'
>> > is
>> > > > only due to internal retries -- something that the user has no
>> > visibility
>> > > > into. And secondly, this is not an error: the batch has been
>> persisted,
>> > > > only the cached metadata has been lost.
>> > > >
>> > > > I think the better approach is to return the a 'success' but make it
>> > > clear
>> > > > that there is no record metadata. If the user tries to access
>> > > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
>> > > > returned metadata, we can raise a 'NoMetadataAvailableException' or
>> > > > something like that.
>> > > >
>> > > > This way users who don't access the 'offset' and 'timestamp' fields
>> > would
>> > > > not notice a change. For the users who do, the offset and timestamp
>> > will
>> > > > not silently be invalid: they will be notified through an exception.
>> > > >
>> > > > This seems like the cleanest way forward and I would like to make
>> this
>> > > > small change to the KIP.
>> > > >
>> > > > Does anybody have any objections?
>> > > >
>> > > > Thanks,
>> > > > Apurva
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Thanks for the comments Ismael.
>> > > > >
>> > > > > I have gone ahead and incorporated all your suggestions in the KIP
>> > > > > document. You convinced me on adding max.message.bytes :)
>> > > > >
>> > > > > Apurva
>> > > > >
>> > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk>
>> > wrote:
>> > > > >
>> > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
>> > > > >>
>> > > > >> 1. We should add a note to the backwards compatibility section
>> > > > explaining
>> > > > >> the impact of throwing DuplicateSequenceException (a new
>> exception)
>> > > from
>> > > > >> `send`. As I understand it, it's not an issue, but good to
>> include
>> > it
>> > > in
>> > > > >> the KIP.
>> > > > >>
>> > > > >> 2. For clarity, it's good to highlight in some way the new
>> fields in
>> > > the
>> > > > >> protocol definition itself
>> > > > >>
>> > > > >> 3. I understand that you decided not to add max.message.bytes
>> > because
>> > > > it's
>> > > > >> unrelated to this KIP. I'll try to persuade you that we should,
>> but
>> > > it's
>> > > > >> not a blocker if you don't agree. The reasons are: 1. The
>> > > implementation
>> > > > >> effort to add it is minimal since it's a topic config like
>> message
>> > > > format
>> > > > >> version, 2. It's clearly beneficial for the producer to have that
>> > > > >> information, 3. It's compact (just a number), 4. It's nice to
>> avoid
>> > > > >> another
>> > > > >> protocol bump for a small change like that.
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Ismael
>> > > > >>
>> > > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <
>> apurva@confluent.io>
>> > > > wrote:
>> > > > >>
>> > > > >> > Hi,
>> > > > >> >
>> > > > >> > I'd like to start a vote for KIP-192:
>> > > > >> >
>> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Apurva
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
Hi Becket,

You are right: the calculations are per partition produced to by each
idempotent producer. I actually think this makes the problem more acute
when we actually end up enabling the idempotent producer by default.
However, even the most optimized version will still result in an overhead
of at least 46 bytes per (producer, broker, partition) triplet.

I will call this out in KIP-185. I think we would want to optimize the
memory utilization of each (partition, broker, producer) triplet before
turning on the default. Another option could be to default max.in.flight to
2 and retain the metadata of just 2 batches. This would significantly
reduce the memory overhead in the default distribution, and in the most
common cases would not result in produce responses without the record
metadata. That may be a simple way to address the issue.
Thanks,
Apurva

On Mon, Sep 11, 2017 at 7:40 PM, Becket Qin <be...@gmail.com> wrote:

> Hi Apurva,
>
> Thanks for the explanation.
>
> I think the STO will be per producer/partition, right? Am I missing
> something? You are right that the proposal does not strengthen the
> semantic. The goal is more about trying to bound the memory consumption to
> some reasonable number and use the memory more efficiently.
>
> But I agree it is not a blocker for this KIP as the memory pressure may not
> be that big. With 5K partitions per broker, 50 producers per partition on
> average, we are going to consume about 35 MB of memory with 5 entries (142
> bytes) in cache. So it is probably still OK and it is more urgent to fix
> the upgrade path.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <ap...@confluent.io> wrote:
>
> > Hi Becket,
> >
> > Regarding the current implementation: we opted for a simpler server side
> > implementation where we _don't_ snapshot the metadata of the last 5
> batches
> > to disk. So if a broker fails, comes back online, and is the leader
> again,
> > it will only have the last batch in memory. With max.in.flight = 5, it is
> > thus possible to receive duplicates of 4 prior batches and yet not have
> the
> > metadata.
> >
> > Since we can return DuplicateSequence to the client, and since this is
> > considered a successful response on the client, this is a good solution.
> It
> > also means that we no longer have a hard limit of max.in.flight == 5: if
> > you use a larger value, you are more likely to receive responses without
> > the offset/timestamp metadata.
> >
> > Your suggestion for sending the lastAckdSequence per partition would help
> > reduce memory on the broker, but it won't bound it: we need at least one
> > cached batch per producer to do sequence number validation so it will
> > always grow proportional to the number of active producers. Nor does it
> > uniquely solve the problem of removing the cap on max.in.flight:
> producers
> > are still not guaranteed that the metadata for all their inflight batches
> > will always be cached and returned.
> >
> > So it doesn't strengthen any semantics, but does optimize memory usage on
> > the broker. But what's the usage with the proposed changes? With 5 cached
> > batches, each producerIdEntry will be 142 bytes, or 7000 active producers
> > who use idempotence will take 1MB per broker. With 1 cached batch, we
> save
> > at most 96 bytes per producer, so we could have at most 21000 active
> > producers using idempotence per MB of broker memory.
> >
> > The savings are significant, but I am not convinced optimizing this is
> > worth it right now since we are not making the idempotent producer the
> > default in this release. Further there are a bunch of other items which
> > make exactly once semantics more usable which we are working on in this
> > release. Given that there are only 8 days left till feature freeze, I
> would
> > rather tackle the usability issues (KIP-192) than make these memory
> > optimizations on the broker. The payoff from the latter seems much
> smaller,
> > and it is always possible to do in the future.
> >
> > What does the rest of the community think?
> >
> > Thanks,
> > Apurva
> >
> > On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <be...@gmail.com>
> wrote:
> >
> > > Hi Apurva,
> > >
> > > Sorry for being late on this thread. I am trying to understand the
> > > implementation of case that we will throw DuplicateSequenceException.
> My
> > > understanding is the following:
> > > 1. On the broker side, we will cache 5 most recent
> > > sequence/timestamp/offset (STO) for each of the producer ID.
> > > 2. When duplicate occurs and the producer has max.in.flight.requests
> set
> > to
> > > <=5, we can return the timestamp/offset from the cached STO.
> > > 3. When duplicate occurs and the producer has max.in.flight.requests
> > > greater than 5. We may need to return DuplicateSequenceException just
> to
> > > indicate the sequence is duplicate, but the ProduceResponse will not
> have
> > > timestamp/offset because it may be out of the 5 entries in the cache.
> > >
> > > Is my understanding correct? If it is the current implementation, I
> have
> > a
> > > few concerns:
> > > 1. One potential issue for this is that if there are a lot of producers
> > > producing to the same cluster (e.g. a Map-Reduce job) we may still
> spend
> > a
> > > lot of memory on caching the most recent STO.
> > > 2. In most cases, we are essentially caching the
> sequnce/timestamp/offset
> > > entries that may never be used again.
> > >
> > > Since we are making protocol changes, I am wondering if we can improve
> > the
> > > above two cases by doing the following:
> > > 1. Add a per partition LastAckedSequence field in the ProduceRequest.
> > > 2. The broker will remove the cached STO entries whose sequence is less
> > > than or equals to LastAckedSequence for each Partition/PID.
> > > 3. Have a global STO cache to cap the number of total cached STO
> entries
> > to
> > > some number, say 1 million. This total number is shared by all the
> > > producers. If this number is reached, we will remove the entries from
> the
> > > producer who has the most cached STO entries.
> > > 4. If there is a sequence smaller than the last sequence and there is
> no
> > > entry in the STO cache, we return DuplicateSequenceException without
> > > offset/timestamp.
> > >
> > > With the above changes, we can have the following benefits:
> > > 1. avoid caching the sequence/timestamp/offset unnecessarily because
> all
> > > the cached entries are the entries that hasn't been confirmed by the
> > > producer.
> > > 2. no magic number of 5 max.in.flight.requests.per.connection
> > > 3. bounded memory footprint on the cached sequence/timestamp/offset
> > > entries.
> > >
> > > Hope it's not too late to have the changes if that makes sense.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <ap...@confluent.io>
> > > wrote:
> > >
> > > > Thanks for the votes everyone.
> > > >
> > > > One of the proposals here was to raise a 'DuplicateSequenceException'
> > to
> > > > the user if the broker detected that one of the internal retries
> > resulted
> > > > in the duplicate, and the metadata for the original batch was no
> longer
> > > > cached.
> > > >
> > > > However, when implementing this change, I realized that this is quite
> > > > unintuitive from the user's point of view. In reality, the
> 'duplicate'
> > is
> > > > only due to internal retries -- something that the user has no
> > visibility
> > > > into. And secondly, this is not an error: the batch has been
> persisted,
> > > > only the cached metadata has been lost.
> > > >
> > > > I think the better approach is to return the a 'success' but make it
> > > clear
> > > > that there is no record metadata. If the user tries to access
> > > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
> > > > returned metadata, we can raise a 'NoMetadataAvailableException' or
> > > > something like that.
> > > >
> > > > This way users who don't access the 'offset' and 'timestamp' fields
> > would
> > > > not notice a change. For the users who do, the offset and timestamp
> > will
> > > > not silently be invalid: they will be notified through an exception.
> > > >
> > > > This seems like the cleanest way forward and I would like to make
> this
> > > > small change to the KIP.
> > > >
> > > > Does anybody have any objections?
> > > >
> > > > Thanks,
> > > > Apurva
> > > >
> > > >
> > > >
> > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for the comments Ismael.
> > > > >
> > > > > I have gone ahead and incorporated all your suggestions in the KIP
> > > > > document. You convinced me on adding max.message.bytes :)
> > > > >
> > > > > Apurva
> > > > >
> > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk>
> > wrote:
> > > > >
> > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
> > > > >>
> > > > >> 1. We should add a note to the backwards compatibility section
> > > > explaining
> > > > >> the impact of throwing DuplicateSequenceException (a new
> exception)
> > > from
> > > > >> `send`. As I understand it, it's not an issue, but good to include
> > it
> > > in
> > > > >> the KIP.
> > > > >>
> > > > >> 2. For clarity, it's good to highlight in some way the new fields
> in
> > > the
> > > > >> protocol definition itself
> > > > >>
> > > > >> 3. I understand that you decided not to add max.message.bytes
> > because
> > > > it's
> > > > >> unrelated to this KIP. I'll try to persuade you that we should,
> but
> > > it's
> > > > >> not a blocker if you don't agree. The reasons are: 1. The
> > > implementation
> > > > >> effort to add it is minimal since it's a topic config like message
> > > > format
> > > > >> version, 2. It's clearly beneficial for the producer to have that
> > > > >> information, 3. It's compact (just a number), 4. It's nice to
> avoid
> > > > >> another
> > > > >> protocol bump for a small change like that.
> > > > >>
> > > > >> Thanks,
> > > > >> Ismael
> > > > >>
> > > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <apurva@confluent.io
> >
> > > > wrote:
> > > > >>
> > > > >> > Hi,
> > > > >> >
> > > > >> > I'd like to start a vote for KIP-192:
> > > > >> >
> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Apurva
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Becket Qin <be...@gmail.com>.
Hi Apurva,

Thanks for the explanation.

I think the STO will be per producer/partition, right? Am I missing
something? You are right that the proposal does not strengthen the
semantic. The goal is more about trying to bound the memory consumption to
some reasonable number and use the memory more efficiently.

But I agree it is not a blocker for this KIP as the memory pressure may not
be that big. With 5K partitions per broker, 50 producers per partition on
average, we are going to consume about 35 MB of memory with 5 entries (142
bytes) in cache. So it is probably still OK and it is more urgent to fix
the upgrade path.

Thanks,

Jiangjie (Becket) Qin





On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <ap...@confluent.io> wrote:

> Hi Becket,
>
> Regarding the current implementation: we opted for a simpler server side
> implementation where we _don't_ snapshot the metadata of the last 5 batches
> to disk. So if a broker fails, comes back online, and is the leader again,
> it will only have the last batch in memory. With max.in.flight = 5, it is
> thus possible to receive duplicates of 4 prior batches and yet not have the
> metadata.
>
> Since we can return DuplicateSequence to the client, and since this is
> considered a successful response on the client, this is a good solution. It
> also means that we no longer have a hard limit of max.in.flight == 5: if
> you use a larger value, you are more likely to receive responses without
> the offset/timestamp metadata.
>
> Your suggestion for sending the lastAckdSequence per partition would help
> reduce memory on the broker, but it won't bound it: we need at least one
> cached batch per producer to do sequence number validation so it will
> always grow proportional to the number of active producers. Nor does it
> uniquely solve the problem of removing the cap on max.in.flight: producers
> are still not guaranteed that the metadata for all their inflight batches
> will always be cached and returned.
>
> So it doesn't strengthen any semantics, but does optimize memory usage on
> the broker. But what's the usage with the proposed changes? With 5 cached
> batches, each producerIdEntry will be 142 bytes, or 7000 active producers
> who use idempotence will take 1MB per broker. With 1 cached batch, we save
> at most 96 bytes per producer, so we could have at most 21000 active
> producers using idempotence per MB of broker memory.
>
> The savings are significant, but I am not convinced optimizing this is
> worth it right now since we are not making the idempotent producer the
> default in this release. Further there are a bunch of other items which
> make exactly once semantics more usable which we are working on in this
> release. Given that there are only 8 days left till feature freeze, I would
> rather tackle the usability issues (KIP-192) than make these memory
> optimizations on the broker. The payoff from the latter seems much smaller,
> and it is always possible to do in the future.
>
> What does the rest of the community think?
>
> Thanks,
> Apurva
>
> On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <be...@gmail.com> wrote:
>
> > Hi Apurva,
> >
> > Sorry for being late on this thread. I am trying to understand the
> > implementation of case that we will throw DuplicateSequenceException. My
> > understanding is the following:
> > 1. On the broker side, we will cache 5 most recent
> > sequence/timestamp/offset (STO) for each of the producer ID.
> > 2. When duplicate occurs and the producer has max.in.flight.requests set
> to
> > <=5, we can return the timestamp/offset from the cached STO.
> > 3. When duplicate occurs and the producer has max.in.flight.requests
> > greater than 5. We may need to return DuplicateSequenceException just to
> > indicate the sequence is duplicate, but the ProduceResponse will not have
> > timestamp/offset because it may be out of the 5 entries in the cache.
> >
> > Is my understanding correct? If it is the current implementation, I have
> a
> > few concerns:
> > 1. One potential issue for this is that if there are a lot of producers
> > producing to the same cluster (e.g. a Map-Reduce job) we may still spend
> a
> > lot of memory on caching the most recent STO.
> > 2. In most cases, we are essentially caching the sequnce/timestamp/offset
> > entries that may never be used again.
> >
> > Since we are making protocol changes, I am wondering if we can improve
> the
> > above two cases by doing the following:
> > 1. Add a per partition LastAckedSequence field in the ProduceRequest.
> > 2. The broker will remove the cached STO entries whose sequence is less
> > than or equals to LastAckedSequence for each Partition/PID.
> > 3. Have a global STO cache to cap the number of total cached STO entries
> to
> > some number, say 1 million. This total number is shared by all the
> > producers. If this number is reached, we will remove the entries from the
> > producer who has the most cached STO entries.
> > 4. If there is a sequence smaller than the last sequence and there is no
> > entry in the STO cache, we return DuplicateSequenceException without
> > offset/timestamp.
> >
> > With the above changes, we can have the following benefits:
> > 1. avoid caching the sequence/timestamp/offset unnecessarily because all
> > the cached entries are the entries that hasn't been confirmed by the
> > producer.
> > 2. no magic number of 5 max.in.flight.requests.per.connection
> > 3. bounded memory footprint on the cached sequence/timestamp/offset
> > entries.
> >
> > Hope it's not too late to have the changes if that makes sense.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <ap...@confluent.io>
> > wrote:
> >
> > > Thanks for the votes everyone.
> > >
> > > One of the proposals here was to raise a 'DuplicateSequenceException'
> to
> > > the user if the broker detected that one of the internal retries
> resulted
> > > in the duplicate, and the metadata for the original batch was no longer
> > > cached.
> > >
> > > However, when implementing this change, I realized that this is quite
> > > unintuitive from the user's point of view. In reality, the 'duplicate'
> is
> > > only due to internal retries -- something that the user has no
> visibility
> > > into. And secondly, this is not an error: the batch has been persisted,
> > > only the cached metadata has been lost.
> > >
> > > I think the better approach is to return the a 'success' but make it
> > clear
> > > that there is no record metadata. If the user tries to access
> > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
> > > returned metadata, we can raise a 'NoMetadataAvailableException' or
> > > something like that.
> > >
> > > This way users who don't access the 'offset' and 'timestamp' fields
> would
> > > not notice a change. For the users who do, the offset and timestamp
> will
> > > not silently be invalid: they will be notified through an exception.
> > >
> > > This seems like the cleanest way forward and I would like to make this
> > > small change to the KIP.
> > >
> > > Does anybody have any objections?
> > >
> > > Thanks,
> > > Apurva
> > >
> > >
> > >
> > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io>
> > wrote:
> > >
> > > > Thanks for the comments Ismael.
> > > >
> > > > I have gone ahead and incorporated all your suggestions in the KIP
> > > > document. You convinced me on adding max.message.bytes :)
> > > >
> > > > Apurva
> > > >
> > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk>
> wrote:
> > > >
> > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
> > > >>
> > > >> 1. We should add a note to the backwards compatibility section
> > > explaining
> > > >> the impact of throwing DuplicateSequenceException (a new exception)
> > from
> > > >> `send`. As I understand it, it's not an issue, but good to include
> it
> > in
> > > >> the KIP.
> > > >>
> > > >> 2. For clarity, it's good to highlight in some way the new fields in
> > the
> > > >> protocol definition itself
> > > >>
> > > >> 3. I understand that you decided not to add max.message.bytes
> because
> > > it's
> > > >> unrelated to this KIP. I'll try to persuade you that we should, but
> > it's
> > > >> not a blocker if you don't agree. The reasons are: 1. The
> > implementation
> > > >> effort to add it is minimal since it's a topic config like message
> > > format
> > > >> version, 2. It's clearly beneficial for the producer to have that
> > > >> information, 3. It's compact (just a number), 4. It's nice to avoid
> > > >> another
> > > >> protocol bump for a small change like that.
> > > >>
> > > >> Thanks,
> > > >> Ismael
> > > >>
> > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io>
> > > wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > I'd like to start a vote for KIP-192:
> > > >> >
> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> > > >> >
> > > >> > Thanks,
> > > >> > Apurva
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
Hi Becket,

Regarding the current implementation: we opted for a simpler server side
implementation where we _don't_ snapshot the metadata of the last 5 batches
to disk. So if a broker fails, comes back online, and is the leader again,
it will only have the last batch in memory. With max.in.flight = 5, it is
thus possible to receive duplicates of 4 prior batches and yet not have the
metadata.

Since we can return DuplicateSequence to the client, and since this is
considered a successful response on the client, this is a good solution. It
also means that we no longer have a hard limit of max.in.flight == 5: if
you use a larger value, you are more likely to receive responses without
the offset/timestamp metadata.

Your suggestion for sending the lastAckdSequence per partition would help
reduce memory on the broker, but it won't bound it: we need at least one
cached batch per producer to do sequence number validation so it will
always grow proportional to the number of active producers. Nor does it
uniquely solve the problem of removing the cap on max.in.flight: producers
are still not guaranteed that the metadata for all their inflight batches
will always be cached and returned.

So it doesn't strengthen any semantics, but does optimize memory usage on
the broker. But what's the usage with the proposed changes? With 5 cached
batches, each producerIdEntry will be 142 bytes, or 7000 active producers
who use idempotence will take 1MB per broker. With 1 cached batch, we save
at most 96 bytes per producer, so we could have at most 21000 active
producers using idempotence per MB of broker memory.

The savings are significant, but I am not convinced optimizing this is
worth it right now since we are not making the idempotent producer the
default in this release. Further there are a bunch of other items which
make exactly once semantics more usable which we are working on in this
release. Given that there are only 8 days left till feature freeze, I would
rather tackle the usability issues (KIP-192) than make these memory
optimizations on the broker. The payoff from the latter seems much smaller,
and it is always possible to do in the future.

What does the rest of the community think?

Thanks,
Apurva

On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <be...@gmail.com> wrote:

> Hi Apurva,
>
> Sorry for being late on this thread. I am trying to understand the
> implementation of case that we will throw DuplicateSequenceException. My
> understanding is the following:
> 1. On the broker side, we will cache 5 most recent
> sequence/timestamp/offset (STO) for each of the producer ID.
> 2. When duplicate occurs and the producer has max.in.flight.requests set to
> <=5, we can return the timestamp/offset from the cached STO.
> 3. When duplicate occurs and the producer has max.in.flight.requests
> greater than 5. We may need to return DuplicateSequenceException just to
> indicate the sequence is duplicate, but the ProduceResponse will not have
> timestamp/offset because it may be out of the 5 entries in the cache.
>
> Is my understanding correct? If it is the current implementation, I have a
> few concerns:
> 1. One potential issue for this is that if there are a lot of producers
> producing to the same cluster (e.g. a Map-Reduce job) we may still spend a
> lot of memory on caching the most recent STO.
> 2. In most cases, we are essentially caching the sequnce/timestamp/offset
> entries that may never be used again.
>
> Since we are making protocol changes, I am wondering if we can improve the
> above two cases by doing the following:
> 1. Add a per partition LastAckedSequence field in the ProduceRequest.
> 2. The broker will remove the cached STO entries whose sequence is less
> than or equals to LastAckedSequence for each Partition/PID.
> 3. Have a global STO cache to cap the number of total cached STO entries to
> some number, say 1 million. This total number is shared by all the
> producers. If this number is reached, we will remove the entries from the
> producer who has the most cached STO entries.
> 4. If there is a sequence smaller than the last sequence and there is no
> entry in the STO cache, we return DuplicateSequenceException without
> offset/timestamp.
>
> With the above changes, we can have the following benefits:
> 1. avoid caching the sequence/timestamp/offset unnecessarily because all
> the cached entries are the entries that hasn't been confirmed by the
> producer.
> 2. no magic number of 5 max.in.flight.requests.per.connection
> 3. bounded memory footprint on the cached sequence/timestamp/offset
> entries.
>
> Hope it's not too late to have the changes if that makes sense.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <ap...@confluent.io>
> wrote:
>
> > Thanks for the votes everyone.
> >
> > One of the proposals here was to raise a 'DuplicateSequenceException' to
> > the user if the broker detected that one of the internal retries resulted
> > in the duplicate, and the metadata for the original batch was no longer
> > cached.
> >
> > However, when implementing this change, I realized that this is quite
> > unintuitive from the user's point of view. In reality, the 'duplicate' is
> > only due to internal retries -- something that the user has no visibility
> > into. And secondly, this is not an error: the batch has been persisted,
> > only the cached metadata has been lost.
> >
> > I think the better approach is to return the a 'success' but make it
> clear
> > that there is no record metadata. If the user tries to access
> > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
> > returned metadata, we can raise a 'NoMetadataAvailableException' or
> > something like that.
> >
> > This way users who don't access the 'offset' and 'timestamp' fields would
> > not notice a change. For the users who do, the offset and timestamp will
> > not silently be invalid: they will be notified through an exception.
> >
> > This seems like the cleanest way forward and I would like to make this
> > small change to the KIP.
> >
> > Does anybody have any objections?
> >
> > Thanks,
> > Apurva
> >
> >
> >
> > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io>
> wrote:
> >
> > > Thanks for the comments Ismael.
> > >
> > > I have gone ahead and incorporated all your suggestions in the KIP
> > > document. You convinced me on adding max.message.bytes :)
> > >
> > > Apurva
> > >
> > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk> wrote:
> > >
> > >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
> > >>
> > >> 1. We should add a note to the backwards compatibility section
> > explaining
> > >> the impact of throwing DuplicateSequenceException (a new exception)
> from
> > >> `send`. As I understand it, it's not an issue, but good to include it
> in
> > >> the KIP.
> > >>
> > >> 2. For clarity, it's good to highlight in some way the new fields in
> the
> > >> protocol definition itself
> > >>
> > >> 3. I understand that you decided not to add max.message.bytes because
> > it's
> > >> unrelated to this KIP. I'll try to persuade you that we should, but
> it's
> > >> not a blocker if you don't agree. The reasons are: 1. The
> implementation
> > >> effort to add it is minimal since it's a topic config like message
> > format
> > >> version, 2. It's clearly beneficial for the producer to have that
> > >> information, 3. It's compact (just a number), 4. It's nice to avoid
> > >> another
> > >> protocol bump for a small change like that.
> > >>
> > >> Thanks,
> > >> Ismael
> > >>
> > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io>
> > wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > I'd like to start a vote for KIP-192:
> > >> >
> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> > >> >
> > >> > Thanks,
> > >> > Apurva
> > >> >
> > >>
> > >
> > >
> >
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Becket Qin <be...@gmail.com>.
Hi Apurva,

Sorry for being late on this thread. I am trying to understand the
implementation of case that we will throw DuplicateSequenceException. My
understanding is the following:
1. On the broker side, we will cache 5 most recent
sequence/timestamp/offset (STO) for each of the producer ID.
2. When duplicate occurs and the producer has max.in.flight.requests set to
<=5, we can return the timestamp/offset from the cached STO.
3. When duplicate occurs and the producer has max.in.flight.requests
greater than 5. We may need to return DuplicateSequenceException just to
indicate the sequence is duplicate, but the ProduceResponse will not have
timestamp/offset because it may be out of the 5 entries in the cache.

Is my understanding correct? If it is the current implementation, I have a
few concerns:
1. One potential issue for this is that if there are a lot of producers
producing to the same cluster (e.g. a Map-Reduce job) we may still spend a
lot of memory on caching the most recent STO.
2. In most cases, we are essentially caching the sequnce/timestamp/offset
entries that may never be used again.

Since we are making protocol changes, I am wondering if we can improve the
above two cases by doing the following:
1. Add a per partition LastAckedSequence field in the ProduceRequest.
2. The broker will remove the cached STO entries whose sequence is less
than or equals to LastAckedSequence for each Partition/PID.
3. Have a global STO cache to cap the number of total cached STO entries to
some number, say 1 million. This total number is shared by all the
producers. If this number is reached, we will remove the entries from the
producer who has the most cached STO entries.
4. If there is a sequence smaller than the last sequence and there is no
entry in the STO cache, we return DuplicateSequenceException without
offset/timestamp.

With the above changes, we can have the following benefits:
1. avoid caching the sequence/timestamp/offset unnecessarily because all
the cached entries are the entries that hasn't been confirmed by the
producer.
2. no magic number of 5 max.in.flight.requests.per.connection
3. bounded memory footprint on the cached sequence/timestamp/offset entries.

Hope it's not too late to have the changes if that makes sense.

Thanks,

Jiangjie (Becket) Qin


On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <ap...@confluent.io> wrote:

> Thanks for the votes everyone.
>
> One of the proposals here was to raise a 'DuplicateSequenceException' to
> the user if the broker detected that one of the internal retries resulted
> in the duplicate, and the metadata for the original batch was no longer
> cached.
>
> However, when implementing this change, I realized that this is quite
> unintuitive from the user's point of view. In reality, the 'duplicate' is
> only due to internal retries -- something that the user has no visibility
> into. And secondly, this is not an error: the batch has been persisted,
> only the cached metadata has been lost.
>
> I think the better approach is to return the a 'success' but make it clear
> that there is no record metadata. If the user tries to access
> `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
> returned metadata, we can raise a 'NoMetadataAvailableException' or
> something like that.
>
> This way users who don't access the 'offset' and 'timestamp' fields would
> not notice a change. For the users who do, the offset and timestamp will
> not silently be invalid: they will be notified through an exception.
>
> This seems like the cleanest way forward and I would like to make this
> small change to the KIP.
>
> Does anybody have any objections?
>
> Thanks,
> Apurva
>
>
>
> On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io> wrote:
>
> > Thanks for the comments Ismael.
> >
> > I have gone ahead and incorporated all your suggestions in the KIP
> > document. You convinced me on adding max.message.bytes :)
> >
> > Apurva
> >
> > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk> wrote:
> >
> >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
> >>
> >> 1. We should add a note to the backwards compatibility section
> explaining
> >> the impact of throwing DuplicateSequenceException (a new exception) from
> >> `send`. As I understand it, it's not an issue, but good to include it in
> >> the KIP.
> >>
> >> 2. For clarity, it's good to highlight in some way the new fields in the
> >> protocol definition itself
> >>
> >> 3. I understand that you decided not to add max.message.bytes because
> it's
> >> unrelated to this KIP. I'll try to persuade you that we should, but it's
> >> not a blocker if you don't agree. The reasons are: 1. The implementation
> >> effort to add it is minimal since it's a topic config like message
> format
> >> version, 2. It's clearly beneficial for the producer to have that
> >> information, 3. It's compact (just a number), 4. It's nice to avoid
> >> another
> >> protocol bump for a small change like that.
> >>
> >> Thanks,
> >> Ismael
> >>
> >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io>
> wrote:
> >>
> >> > Hi,
> >> >
> >> > I'd like to start a vote for KIP-192:
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> >> >
> >> > Thanks,
> >> > Apurva
> >> >
> >>
> >
> >
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
Thanks for the votes everyone.

One of the proposals here was to raise a 'DuplicateSequenceException' to
the user if the broker detected that one of the internal retries resulted
in the duplicate, and the metadata for the original batch was no longer
cached.

However, when implementing this change, I realized that this is quite
unintuitive from the user's point of view. In reality, the 'duplicate' is
only due to internal retries -- something that the user has no visibility
into. And secondly, this is not an error: the batch has been persisted,
only the cached metadata has been lost.

I think the better approach is to return the a 'success' but make it clear
that there is no record metadata. If the user tries to access
`RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
returned metadata, we can raise a 'NoMetadataAvailableException' or
something like that.

This way users who don't access the 'offset' and 'timestamp' fields would
not notice a change. For the users who do, the offset and timestamp will
not silently be invalid: they will be notified through an exception.

This seems like the cleanest way forward and I would like to make this
small change to the KIP.

Does anybody have any objections?

Thanks,
Apurva



On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <ap...@confluent.io> wrote:

> Thanks for the comments Ismael.
>
> I have gone ahead and incorporated all your suggestions in the KIP
> document. You convinced me on adding max.message.bytes :)
>
> Apurva
>
> On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk> wrote:
>
>> Thanks for the KIP. +1 (binding) from me. A few minor comments:
>>
>> 1. We should add a note to the backwards compatibility section explaining
>> the impact of throwing DuplicateSequenceException (a new exception) from
>> `send`. As I understand it, it's not an issue, but good to include it in
>> the KIP.
>>
>> 2. For clarity, it's good to highlight in some way the new fields in the
>> protocol definition itself
>>
>> 3. I understand that you decided not to add max.message.bytes because it's
>> unrelated to this KIP. I'll try to persuade you that we should, but it's
>> not a blocker if you don't agree. The reasons are: 1. The implementation
>> effort to add it is minimal since it's a topic config like message format
>> version, 2. It's clearly beneficial for the producer to have that
>> information, 3. It's compact (just a number), 4. It's nice to avoid
>> another
>> protocol bump for a small change like that.
>>
>> Thanks,
>> Ismael
>>
>> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io> wrote:
>>
>> > Hi,
>> >
>> > I'd like to start a vote for KIP-192:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
>> >
>> > Thanks,
>> > Apurva
>> >
>>
>
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Apurva Mehta <ap...@confluent.io>.
Thanks for the comments Ismael.

I have gone ahead and incorporated all your suggestions in the KIP
document. You convinced me on adding max.message.bytes :)

Apurva

On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <is...@juma.me.uk> wrote:

> Thanks for the KIP. +1 (binding) from me. A few minor comments:
>
> 1. We should add a note to the backwards compatibility section explaining
> the impact of throwing DuplicateSequenceException (a new exception) from
> `send`. As I understand it, it's not an issue, but good to include it in
> the KIP.
>
> 2. For clarity, it's good to highlight in some way the new fields in the
> protocol definition itself
>
> 3. I understand that you decided not to add max.message.bytes because it's
> unrelated to this KIP. I'll try to persuade you that we should, but it's
> not a blocker if you don't agree. The reasons are: 1. The implementation
> effort to add it is minimal since it's a topic config like message format
> version, 2. It's clearly beneficial for the producer to have that
> information, 3. It's compact (just a number), 4. It's nice to avoid another
> protocol bump for a small change like that.
>
> Thanks,
> Ismael
>
> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io> wrote:
>
> > Hi,
> >
> > I'd like to start a vote for KIP-192:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> >
> > Thanks,
> > Apurva
> >
>

Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled

Posted by Ismael Juma <is...@juma.me.uk>.
Thanks for the KIP. +1 (binding) from me. A few minor comments:

1. We should add a note to the backwards compatibility section explaining
the impact of throwing DuplicateSequenceException (a new exception) from
`send`. As I understand it, it's not an issue, but good to include it in
the KIP.

2. For clarity, it's good to highlight in some way the new fields in the
protocol definition itself

3. I understand that you decided not to add max.message.bytes because it's
unrelated to this KIP. I'll try to persuade you that we should, but it's
not a blocker if you don't agree. The reasons are: 1. The implementation
effort to add it is minimal since it's a topic config like message format
version, 2. It's clearly beneficial for the producer to have that
information, 3. It's compact (just a number), 4. It's nice to avoid another
protocol bump for a small change like that.

Thanks,
Ismael

On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <ap...@confluent.io> wrote:

> Hi,
>
> I'd like to start a vote for KIP-192:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
>
> Thanks,
> Apurva
>