You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dong Lin <li...@gmail.com> on 2018/09/25 21:01:59 UTC

Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

For record purpose, this KIP is closed as its design has been merged into
KIP-320. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation
.

On Wed, Jan 31, 2018 at 12:16 AM Dong Lin <li...@gmail.com> wrote:

> Hey Jun, Jason,
>
> Thanks for all the comments. Could you see if you can give +1 for the KIP?
> I am open to make further improvements for the KIP.
>
> Thanks,
> Dong
>
> On Tue, Jan 23, 2018 at 3:44 PM, Dong Lin <li...@gmail.com> wrote:
>
>> Hey Jun, Jason,
>>
>> Thanks much for all the review! I will open the voting thread.
>>
>> Regards,
>> Dong
>>
>> On Tue, Jan 23, 2018 at 3:37 PM, Jun Rao <ju...@confluent.io> wrote:
>>
>>> Hi, Dong,
>>>
>>> The current KIP looks good to me.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Jan 23, 2018 at 12:29 PM, Dong Lin <li...@gmail.com> wrote:
>>>
>>> > Hey Jun,
>>> >
>>> > Do you think the current KIP looks OK? I am wondering if we can open
>>> the
>>> > voting thread.
>>> >
>>> > Thanks!
>>> > Dong
>>> >
>>> > On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <li...@gmail.com> wrote:
>>> >
>>> > > Hey Jun,
>>> > >
>>> > > I think we can probably have a static method in Util class to decode
>>> the
>>> > > byte[]. Both KafkaConsumer implementation and the user application
>>> will
>>> > be
>>> > > able to decode the byte array and log its content for debug purpose.
>>> So
>>> > it
>>> > > seems that we can still print the information we want. It is just not
>>> > > explicitly exposed in the consumer interface. Would this address the
>>> > > problem here?
>>> > >
>>> > > Yeah we can include OffsetEpoch in AdminClient. This can be added in
>>> > > KIP-222? Is there something you would like me to add in this KIP?
>>> > >
>>> > > Thanks!
>>> > > Dong
>>> > >
>>> > > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <ju...@confluent.io> wrote:
>>> > >
>>> > >> Hi, Dong,
>>> > >>
>>> > >> The issue with using just byte[] for OffsetEpoch is that it won't be
>>> > >> printable, which makes debugging harder.
>>> > >>
>>> > >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If
>>> > that
>>> > >> gets adopted before this KIP, we probably want to include
>>> OffsetEpoch in
>>> > >> the AdminClient too.
>>> > >>
>>> > >> Thanks,
>>> > >>
>>> > >> Jun
>>> > >>
>>> > >>
>>> > >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <li...@gmail.com>
>>> wrote:
>>> > >>
>>> > >> > Hey Jun,
>>> > >> >
>>> > >> > I agree. I have updated the KIP to remove the class OffetEpoch and
>>> > >> replace
>>> > >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it
>>> looks
>>> > >> good?
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Dong
>>> > >> >
>>> > >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <ju...@confluent.io>
>>> wrote:
>>> > >> >
>>> > >> > > Hi, Dong,
>>> > >> > >
>>> > >> > > Thanks for the updated KIP. It looks good to me now. The only
>>> thing
>>> > is
>>> > >> > > for OffsetEpoch.
>>> > >> > > If we expose the individual fields in the class, we probably
>>> don't
>>> > >> need
>>> > >> > the
>>> > >> > > encode/decode methods. If we want to hide the details of
>>> > OffsetEpoch,
>>> > >> we
>>> > >> > > probably don't want expose the individual fields.
>>> > >> > >
>>> > >> > > Jun
>>> > >> > >
>>> > >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindong28@gmail.com
>>> >
>>> > >> wrote:
>>> > >> > >
>>> > >> > > > Thinking about point 61 more, I realize that the async
>>> zookeeper
>>> > >> read
>>> > >> > may
>>> > >> > > > make it less of an issue for controller to read more zookeeper
>>> > >> nodes.
>>> > >> > > > Writing partition_epoch in the per-partition znode makes it
>>> > simpler
>>> > >> to
>>> > >> > > > handle the broker failure between zookeeper writes for a topic
>>> > >> > creation.
>>> > >> > > I
>>> > >> > > > have updated the KIP to use the suggested approach.
>>> > >> > > >
>>> > >> > > >
>>> > >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <
>>> lindong28@gmail.com>
>>> > >> wrote:
>>> > >> > > >
>>> > >> > > > > Hey Jun,
>>> > >> > > > >
>>> > >> > > > > Thanks much for the comments. Please see my comments inline.
>>> > >> > > > >
>>> > >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <ju...@confluent.io>
>>> > >> wrote:
>>> > >> > > > >
>>> > >> > > > >> Hi, Dong,
>>> > >> > > > >>
>>> > >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just
>>> a
>>> > few
>>> > >> > minor
>>> > >> > > > >> comments.
>>> > >> > > > >>
>>> > >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition
>>> > >> > > partition):
>>> > >> > > > >> It
>>> > >> > > > >> seems that there is no need to return metadata. We probably
>>> > want
>>> > >> to
>>> > >> > > > return
>>> > >> > > > >> sth like OffsetAndEpoch.
>>> > >> > > > >>
>>> > >> > > > >
>>> > >> > > > > Previously I think we may want to re-use the existing class
>>> to
>>> > >> keep
>>> > >> > our
>>> > >> > > > > consumer interface simpler. I have updated the KIP to add
>>> class
>>> > >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because
>>> user
>>> > may
>>> > >> > > > confuse
>>> > >> > > > > this name with OffsetEpoch. Does this sound OK?
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >>
>>> > >> > > > >> 61. Should we store partition_epoch in
>>> > >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK?
>>> > >> > > > >>
>>> > >> > > > >
>>> > >> > > > > I have considered this. I think the advantage of adding the
>>> > >> > > > > partition->partition_epoch map in the existing
>>> > >> > > > > znode /brokers/topics/[topic]/partitions is that controller
>>> > only
>>> > >> > needs
>>> > >> > > > to
>>> > >> > > > > read one znode per topic to gets its partition_epoch
>>> > information.
>>> > >> > > > Otherwise
>>> > >> > > > > controller may need to read one extra znode per partition
>>> to get
>>> > >> the
>>> > >> > > same
>>> > >> > > > > information.
>>> > >> > > > >
>>> > >> > > > > When we delete partition or expand partition of a topic,
>>> someone
>>> > >> > needs
>>> > >> > > to
>>> > >> > > > > modify partition->partition_epoch map in znode
>>> > >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more
>>> > >> > > complicated
>>> > >> > > > > than simply adding or deleting znode
>>> /brokers/topics/[topic]/
>>> > >> > > > partitions/[partitionId].
>>> > >> > > > > But the complexity is probably similar to the existing
>>> operation
>>> > >> of
>>> > >> > > > > modifying the partition->replica_list mapping in znode
>>> > >> > > > > /brokers/topics/[topic]. So not sure it is better to store
>>> the
>>> > >> > > > > partition_epoch in /brokers/topics/[topic]/partit
>>> > >> ions/[partitionId].
>>> > >> > > > What
>>> > >> > > > > do you think?
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >>
>>> > >> > > > >> 62. For checking outdated metadata in the client, we
>>> probably
>>> > >> want
>>> > >> > to
>>> > >> > > > add
>>> > >> > > > >> when max_partition_epoch will be used.
>>> > >> > > > >>
>>> > >> > > > >
>>> > >> > > > > The max_partition_epoch is used in the Proposed Changes ->
>>> > >> Client's
>>> > >> > > > > metadata refresh section to determine whether a metadata is
>>> > >> outdated.
>>> > >> > > And
>>> > >> > > > > this formula is referenced and re-used in other sections to
>>> > >> determine
>>> > >> > > > > whether a metadata is outdated. Does this formula look OK?
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >>
>>> > >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of
>>> > >> messages
>>> > >> > > > whose
>>> > >> > > > >> offset < the commit offset. If no message has been consumed
>>> > since
>>> > >> > > > consumer
>>> > >> > > > >> initialization, the leader_epoch from seek(...) or
>>> > >> > OffsetFetchResponse
>>> > >> > > > >> should be used. The partition_epoch should be read from the
>>> > last
>>> > >> > > > >> FetchResponse corresponding to the given partition and
>>> commit
>>> > >> > offset.
>>> > >> > > ":
>>> > >> > > > >> leader_epoch and partition_epoch are associated with an
>>> offset.
>>> > >> So,
>>> > >> > if
>>> > >> > > > no
>>> > >> > > > >> message is consumed, there is no offset and therefore
>>> there is
>>> > no
>>> > >> > need
>>> > >> > > > to
>>> > >> > > > >> read leader_epoch and partition_epoch. Also, the
>>> leader_epoch
>>> > >> > > associated
>>> > >> > > > >> with the offset should just come from the messages
>>> returned in
>>> > >> the
>>> > >> > > fetch
>>> > >> > > > >> response.
>>> > >> > > > >>
>>> > >> > > > >
>>> > >> > > > > I am thinking that, if user calls seek(..) and
>>> commitSync(...)
>>> > >> > without
>>> > >> > > > > consuming any messages, we should re-use the leader_epoch
>>> and
>>> > >> > > > > partition_epoch provided by the seek(...) in the
>>> > >> OffsetCommitRequest.
>>> > >> > > And
>>> > >> > > > > if messages have been successfully consumed, then
>>> leader_epoch
>>> > >> will
>>> > >> > > come
>>> > >> > > > > from the messages returned in the fetch response. The
>>> condition
>>> > >> > > "messages
>>> > >> > > > > whose offset < the commit offset" is needed to take care of
>>> the
>>> > >> log
>>> > >> > > > > compacted topic which may have offset gap due to log
>>> cleaning.
>>> > >> > > > >
>>> > >> > > > > Did I miss something here? Or should I rephrase the
>>> paragraph to
>>> > >> make
>>> > >> > > it
>>> > >> > > > > less confusing?
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >> 64. Could you include the public methods in the OffsetEpoch
>>> > >> class?
>>> > >> > > > >>
>>> > >> > > > >
>>> > >> > > > > I mistakenly deleted the definition of OffsetEpoch class
>>> from
>>> > the
>>> > >> > KIP.
>>> > >> > > I
>>> > >> > > > > just added it back with the public methods. Could you take
>>> > another
>>> > >> > > look?
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >>
>>> > >> > > > >> Jun
>>> > >> > > > >>
>>> > >> > > > >>
>>> > >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <
>>> lindong28@gmail.com
>>> > >
>>> > >> > > wrote:
>>> > >> > > > >>
>>> > >> > > > >> > Hey Jun,
>>> > >> > > > >> >
>>> > >> > > > >> > Thanks much. I agree that we can not rely on committed
>>> > offsets
>>> > >> to
>>> > >> > be
>>> > >> > > > >> always
>>> > >> > > > >> > deleted when we delete topic. So it is necessary to use a
>>> > >> > > > per-partition
>>> > >> > > > >> > epoch that does not change unless this partition is
>>> deleted.
>>> > I
>>> > >> > also
>>> > >> > > > >> agree
>>> > >> > > > >> > that it is very nice to be able to uniquely identify a
>>> > message
>>> > >> > with
>>> > >> > > > >> > (offset, leader_epoch, partition_epoch) in face of
>>> potential
>>> > >> topic
>>> > >> > > > >> deletion
>>> > >> > > > >> > and unclean leader election.
>>> > >> > > > >> >
>>> > >> > > > >> > I agree with all your comments. And I have updated the
>>> KIP
>>> > >> based
>>> > >> > on
>>> > >> > > > our
>>> > >> > > > >> > latest discussion. In addition, I added
>>> > >> > > InvalidPartitionEpochException
>>> > >> > > > >> > which will be thrown by consumer.poll() if the
>>> > partition_epoch
>>> > >> > > > >> associated
>>> > >> > > > >> > with the partition, which can be given to consumer using
>>> > >> > seek(...),
>>> > >> > > is
>>> > >> > > > >> > different from the partition_epoch in the FetchResponse.
>>> > >> > > > >> >
>>> > >> > > > >> > Can you take another look at the latest KIP?
>>> > >> > > > >> >
>>> > >> > > > >> > Thanks!
>>> > >> > > > >> > Dong
>>> > >> > > > >> >
>>> > >> > > > >> >
>>> > >> > > > >> >
>>> > >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <
>>> jun@confluent.io>
>>> > >> > wrote:
>>> > >> > > > >> >
>>> > >> > > > >> > > Hi, Dong,
>>> > >> > > > >> > >
>>> > >> > > > >> > > My replies are the following.
>>> > >> > > > >> > >
>>> > >> > > > >> > > 60. What you described could also work. The drawback is
>>> > that
>>> > >> we
>>> > >> > > will
>>> > >> > > > >> be
>>> > >> > > > >> > > unnecessarily changing the partition epoch when a
>>> partition
>>> > >> > hasn't
>>> > >> > > > >> really
>>> > >> > > > >> > > changed. I was imagining that the partition epoch will
>>> be
>>> > >> stored
>>> > >> > > in
>>> > >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId],
>>> instead
>>> > >> of at
>>> > >> > > the
>>> > >> > > > >> > topic
>>> > >> > > > >> > > level. So, not sure if ZK size limit is an issue.
>>> > >> > > > >> > >
>>> > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a
>>> unique
>>> > >> > > > identifier
>>> > >> > > > >> > for
>>> > >> > > > >> > > a message. So, if a message hasn't changed, the offset
>>> and
>>> > >> the
>>> > >> > > > >> associated
>>> > >> > > > >> > > offset_epoch ideally should remain the same (it will be
>>> > kind
>>> > >> of
>>> > >> > > > weird
>>> > >> > > > >> if
>>> > >> > > > >> > > two consumer apps save the offset on the same message,
>>> but
>>> > >> the
>>> > >> > > > >> > offset_epoch
>>> > >> > > > >> > > are different). partition_epoch + leader_epoch give us
>>> > that.
>>> > >> > > > >> > global_epoch +
>>> > >> > > > >> > > leader_epoch don't. If we use this approach, we can
>>> solve
>>> > not
>>> > >> > only
>>> > >> > > > the
>>> > >> > > > >> > > problem that you have identified, but also other
>>> problems
>>> > >> when
>>> > >> > > there
>>> > >> > > > >> is
>>> > >> > > > >> > > data loss or topic re-creation more reliably. For
>>> example,
>>> > in
>>> > >> > the
>>> > >> > > > >> future,
>>> > >> > > > >> > > if we include the partition_epoch and leader_epoch in
>>> the
>>> > >> fetch
>>> > >> > > > >> request,
>>> > >> > > > >> > > the server can do a more reliable check of whether that
>>> > >> offset
>>> > >> > is
>>> > >> > > > >> valid
>>> > >> > > > >> > or
>>> > >> > > > >> > > not. I am not sure that we can rely upon all external
>>> > >> offsets to
>>> > >> > > be
>>> > >> > > > >> > removed
>>> > >> > > > >> > > on topic deletion. For example, a topic may be deleted
>>> by
>>> > an
>>> > >> > admin
>>> > >> > > > who
>>> > >> > > > >> > may
>>> > >> > > > >> > > not know all the applications.
>>> > >> > > > >> > >
>>> > >> > > > >> > > If we agree on the above, the second question is then
>>> how
>>> > to
>>> > >> > > > reliably
>>> > >> > > > >> > > propagate the partition_epoch and the leader_epoch to
>>> the
>>> > >> > consumer
>>> > >> > > > >> when
>>> > >> > > > >> > > there are leader or partition changes. The leader_epoch
>>> > comes
>>> > >> > from
>>> > >> > > > the
>>> > >> > > > >> > > message, which is reliable. So, I was suggesting that
>>> when
>>> > we
>>> > >> > > store
>>> > >> > > > an
>>> > >> > > > >> > > offset, we can just store the leader_epoch from the
>>> message
>>> > >> set
>>> > >> > > > >> > containing
>>> > >> > > > >> > > that offset. Similarly, I was thinking that if the
>>> > >> > partition_epoch
>>> > >> > > > is
>>> > >> > > > >> in
>>> > >> > > > >> > > the fetch response, we can propagate partition_epoch
>>> > reliably
>>> > >> > > where
>>> > >> > > > is
>>> > >> > > > >> > > partition_epoch change.
>>> > >> > > > >> > >
>>> > >> > > > >> > > 63. My point is that once a leader is producing a
>>> message
>>> > in
>>> > >> the
>>> > >> > > new
>>> > >> > > > >> > > partition_epoch, ideally, we should associate the new
>>> > offsets
>>> > >> > with
>>> > >> > > > the
>>> > >> > > > >> > new
>>> > >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be
>>> the
>>> > >> > correct
>>> > >> > > > >> unique
>>> > >> > > > >> > > identifier (useful for solving other problems mentioned
>>> > >> above).
>>> > >> > I
>>> > >> > > > was
>>> > >> > > > >> > > originally thinking that the leader will include the
>>> > >> > > partition_epoch
>>> > >> > > > >> in
>>> > >> > > > >> > the
>>> > >> > > > >> > > metadata cache in the fetch response. It's just that
>>> right
>>> > >> now,
>>> > >> > > > >> metadata
>>> > >> > > > >> > > cache is updated on UpdateMetadataRequest, which
>>> typically
>>> > >> > happens
>>> > >> > > > >> after
>>> > >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the
>>> leader
>>> > >> to
>>> > >> > > cache
>>> > >> > > > >> the
>>> > >> > > > >> > > partition_epoch in the Partition object and return that
>>> > >> (instead
>>> > >> > > of
>>> > >> > > > >> the
>>> > >> > > > >> > one
>>> > >> > > > >> > > in metadata cache) in the fetch response.
>>> > >> > > > >> > >
>>> > >> > > > >> > > 65. It seems to me that the global_epoch and the
>>> > >> partition_epoch
>>> > >> > > > have
>>> > >> > > > >> > > different purposes. A partition_epoch has the benefit
>>> that
>>> > it
>>> > >> > (1)
>>> > >> > > > can
>>> > >> > > > >> be
>>> > >> > > > >> > > used to form a unique identifier for a message and (2)
>>> can
>>> > be
>>> > >> > used
>>> > >> > > > to
>>> > >> > > > >> > > solve other
>>> > >> > > > >> > > corner case problems in the future. I am not sure
>>> having
>>> > >> just a
>>> > >> > > > >> > > global_epoch can achieve these. global_epoch is useful
>>> to
>>> > >> > > determine
>>> > >> > > > >> which
>>> > >> > > > >> > > version of the metadata is newer, especially with topic
>>> > >> > deletion.
>>> > >> > > > >> > >
>>> > >> > > > >> > > Thanks,
>>> > >> > > > >> > >
>>> > >> > > > >> > > Jun
>>> > >> > > > >> > >
>>> > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <
>>> > >> lindong28@gmail.com>
>>> > >> > > > >> wrote:
>>> > >> > > > >> > >
>>> > >> > > > >> > > > Regarding the use of the global epoch in 65), it is
>>> very
>>> > >> > similar
>>> > >> > > > to
>>> > >> > > > >> the
>>> > >> > > > >> > > > proposal of the metadata_epoch we discussed earlier.
>>> The
>>> > >> main
>>> > >> > > > >> > difference
>>> > >> > > > >> > > is
>>> > >> > > > >> > > > that this epoch is incremented when we
>>> > create/expand/delete
>>> > >> > > topic
>>> > >> > > > >> and
>>> > >> > > > >> > > does
>>> > >> > > > >> > > > not change when controller re-send metadata.
>>> > >> > > > >> > > >
>>> > >> > > > >> > > > I looked at our previous discussion. It seems that we
>>> > >> prefer
>>> > >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we
>>> > >> prefer
>>> > >> > not
>>> > >> > > > to
>>> > >> > > > >> > have
>>> > >> > > > >> > > an
>>> > >> > > > >> > > > ever growing metadata_epoch and 2) we can reset
>>> offset
>>> > >> better
>>> > >> > > when
>>> > >> > > > >> > topic
>>> > >> > > > >> > > is
>>> > >> > > > >> > > > re-created. The use of global topic_epoch avoids the
>>> > >> drawback
>>> > >> > of
>>> > >> > > > an
>>> > >> > > > >> > ever
>>> > >> > > > >> > > > quickly ever growing metadata_epoch. Though the
>>> global
>>> > >> epoch
>>> > >> > > does
>>> > >> > > > >> not
>>> > >> > > > >> > > allow
>>> > >> > > > >> > > > us to recognize the invalid offset committed before
>>> the
>>> > >> topic
>>> > >> > > > >> > > re-creation,
>>> > >> > > > >> > > > we can probably just delete the offset when we
>>> delete a
>>> > >> topic.
>>> > >> > > > Thus
>>> > >> > > > >> I
>>> > >> > > > >> > am
>>> > >> > > > >> > > > not very sure whether it is still worthwhile to have
>>> a
>>> > >> > > > per-partition
>>> > >> > > > >> > > > partition_epoch if the metadata already has the
>>> global
>>> > >> epoch.
>>> > >> > > > >> > > >
>>> > >> > > > >> > > >
>>> > >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <
>>> > >> lindong28@gmail.com
>>> > >> > >
>>> > >> > > > >> wrote:
>>> > >> > > > >> > > >
>>> > >> > > > >> > > > > Hey Jun,
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Thanks so much. These comments very useful. Please
>>> see
>>> > >> below
>>> > >> > > my
>>> > >> > > > >> > > comments.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <
>>> > >> jun@confluent.io>
>>> > >> > > > wrote:
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> Hi, Dong,
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> Thanks for the updated KIP. A few more comments.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> 60. Perhaps having a partition epoch is more
>>> flexible
>>> > >> since
>>> > >> > > in
>>> > >> > > > >> the
>>> > >> > > > >> > > > future,
>>> > >> > > > >> > > > >> we may support deleting a partition as well.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Yeah I have considered this. I think we can
>>> probably
>>> > >> still
>>> > >> > > > support
>>> > >> > > > >> > > > > deleting a partition by using the topic_epoch --
>>> when
>>> > >> > > partition
>>> > >> > > > >> of a
>>> > >> > > > >> > > > topic
>>> > >> > > > >> > > > > is deleted or created, epoch of all partitions of
>>> this
>>> > >> topic
>>> > >> > > > will
>>> > >> > > > >> be
>>> > >> > > > >> > > > > incremented by 1. Therefore, if that partition is
>>> > >> re-created
>>> > >> > > > >> later,
>>> > >> > > > >> > the
>>> > >> > > > >> > > > > epoch of that partition will still be larger than
>>> its
>>> > >> epoch
>>> > >> > > > before
>>> > >> > > > >> > the
>>> > >> > > > >> > > > > deletion, which still allows the client to order
>>> the
>>> > >> > metadata
>>> > >> > > > for
>>> > >> > > > >> the
>>> > >> > > > >> > > > > purpose of this KIP. Does this sound reasonable?
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > The advantage of using topic_epoch instead of
>>> > >> > partition_epoch
>>> > >> > > is
>>> > >> > > > >> that
>>> > >> > > > >> > > the
>>> > >> > > > >> > > > > size of the /brokers/topics/[topic] znode and
>>> > >> > request/response
>>> > >> > > > >> size
>>> > >> > > > >> > can
>>> > >> > > > >> > > > be
>>> > >> > > > >> > > > > smaller. We have a limit on the maximum size of
>>> znode
>>> > >> > > (typically
>>> > >> > > > >> > 1MB).
>>> > >> > > > >> > > > Use
>>> > >> > > > >> > > > > partition epoch can effectively reduce the number
>>> of
>>> > >> > > partitions
>>> > >> > > > >> that
>>> > >> > > > >> > > can
>>> > >> > > > >> > > > be
>>> > >> > > > >> > > > > described by the /brokers/topics/[topic] znode.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > One use-case of partition_epoch for client to
>>> detect
>>> > that
>>> > >> > the
>>> > >> > > > >> > committed
>>> > >> > > > >> > > > > offset, either from kafka offset topic or from the
>>> > >> external
>>> > >> > > > store
>>> > >> > > > >> is
>>> > >> > > > >> > > > > invalid after partition deletion and re-creation.
>>> > >> However,
>>> > >> > it
>>> > >> > > > >> seems
>>> > >> > > > >> > > that
>>> > >> > > > >> > > > we
>>> > >> > > > >> > > > > can also address this use-case with other
>>> approaches.
>>> > For
>>> > >> > > > example,
>>> > >> > > > >> > when
>>> > >> > > > >> > > > > AdminClient deletes partitions, it can also delete
>>> the
>>> > >> > > committed
>>> > >> > > > >> > > offsets
>>> > >> > > > >> > > > > for those partitions from the offset topic. If user
>>> > >> stores
>>> > >> > > > offset
>>> > >> > > > >> > > > > externally, it might make sense for user to
>>> similarly
>>> > >> remove
>>> > >> > > > >> offsets
>>> > >> > > > >> > of
>>> > >> > > > >> > > > > related partitions after these partitions are
>>> deleted.
>>> > >> So I
>>> > >> > am
>>> > >> > > > not
>>> > >> > > > >> > sure
>>> > >> > > > >> > > > > that we should use partition_epoch in this KIP.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> 61. It seems that the leader epoch returned in the
>>> > >> > position()
>>> > >> > > > >> call
>>> > >> > > > >> > > > should
>>> > >> > > > >> > > > >> the the leader epoch returned in the fetch
>>> response,
>>> > not
>>> > >> > the
>>> > >> > > > one
>>> > >> > > > >> in
>>> > >> > > > >> > > the
>>> > >> > > > >> > > > >> metadata cache of the client.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > I think this is a good idea. Just to double check,
>>> this
>>> > >> > change
>>> > >> > > > >> does
>>> > >> > > > >> > not
>>> > >> > > > >> > > > > affect the correctness or performance of this KIP.
>>> But
>>> > it
>>> > >> > can
>>> > >> > > be
>>> > >> > > > >> > useful
>>> > >> > > > >> > > > if
>>> > >> > > > >> > > > > we want to use the leader_epoch to better handle
>>> the
>>> > >> offset
>>> > >> > > rest
>>> > >> > > > >> in
>>> > >> > > > >> > > case
>>> > >> > > > >> > > > of
>>> > >> > > > >> > > > > unclean leader election, which is listed in the
>>> future
>>> > >> work.
>>> > >> > > Is
>>> > >> > > > >> this
>>> > >> > > > >> > > > > understanding correct?
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > I have updated the KIP to specify that the
>>> leader_epoch
>>> > >> > > returned
>>> > >> > > > >> by
>>> > >> > > > >> > > > > position() should be the largest leader_epoch of
>>> those
>>> > >> > already
>>> > >> > > > >> > consumed
>>> > >> > > > >> > > > > messages whose offset < position. If no message has
>>> > been
>>> > >> > > > consumed
>>> > >> > > > >> > since
>>> > >> > > > >> > > > > consumer initialization, the leader_epoch from
>>> seek()
>>> > or
>>> > >> > > > >> > > > > OffsetFetchResponse should be used. The offset
>>> included
>>> > >> in
>>> > >> > the
>>> > >> > > > >> > > > > OffsetCommitRequest will also be determined in the
>>> > >> similar
>>> > >> > > > manner.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> 62. I am wondering if we should return the
>>> partition
>>> > >> epoch
>>> > >> > in
>>> > >> > > > the
>>> > >> > > > >> > > fetch
>>> > >> > > > >> > > > >> response as well. In the current proposal, if a
>>> topic
>>> > is
>>> > >> > > > >> recreated
>>> > >> > > > >> > and
>>> > >> > > > >> > > > the
>>> > >> > > > >> > > > >> new leader is on the same broker as the old one,
>>> there
>>> > >> is
>>> > >> > > > >> nothing to
>>> > >> > > > >> > > > force
>>> > >> > > > >> > > > >> the metadata refresh in the client. So, the
>>> client may
>>> > >> > still
>>> > >> > > > >> > associate
>>> > >> > > > >> > > > the
>>> > >> > > > >> > > > >> offset with the old partition epoch.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Could you help me understand the problem if a
>>> client
>>> > >> > > associates
>>> > >> > > > >> old
>>> > >> > > > >> > > > > partition_epoch (or the topic_epoch as of the
>>> current
>>> > >> KIP)
>>> > >> > > with
>>> > >> > > > >> the
>>> > >> > > > >> > > > offset?
>>> > >> > > > >> > > > > The main purpose of the topic_epoch is to be able
>>> to
>>> > drop
>>> > >> > > > >> > leader_epoch
>>> > >> > > > >> > > > to 0
>>> > >> > > > >> > > > > after a partition is deleted and re-created. I
>>> guess
>>> > you
>>> > >> may
>>> > >> > > be
>>> > >> > > > >> > > thinking
>>> > >> > > > >> > > > > about using the partition_epoch to detect that the
>>> > >> committed
>>> > >> > > > >> offset
>>> > >> > > > >> > is
>>> > >> > > > >> > > > > invalid? In that case, I am wondering if the
>>> > alternative
>>> > >> > > > approach
>>> > >> > > > >> > > > described
>>> > >> > > > >> > > > > in 60) would be reasonable.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> 63. There is some subtle coordination between the
>>> > >> > > > >> > LeaderAndIsrRequest
>>> > >> > > > >> > > > and
>>> > >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader
>>> > changes,
>>> > >> > the
>>> > >> > > > >> > > controller
>>> > >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the
>>> assigned
>>> > >> > replicas
>>> > >> > > > and
>>> > >> > > > >> the
>>> > >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there
>>> could
>>> > >> be a
>>> > >> > > > small
>>> > >> > > > >> > > window
>>> > >> > > > >> > > > >> when the leader already receives the new partition
>>> > >> epoch in
>>> > >> > > the
>>> > >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the
>>> > >> broker
>>> > >> > > > hasn't
>>> > >> > > > >> > been
>>> > >> > > > >> > > > >> updated with the latest partition epoch. Not sure
>>> > what's
>>> > >> > the
>>> > >> > > > best
>>> > >> > > > >> > way
>>> > >> > > > >> > > to
>>> > >> > > > >> > > > >> address this issue. Perhaps we can update the
>>> metadata
>>> > >> > cache
>>> > >> > > on
>>> > >> > > > >> the
>>> > >> > > > >> > > > broker
>>> > >> > > > >> > > > >> with both LeaderAndIsrRequest and
>>> > UpdateMetadataRequest.
>>> > >> > The
>>> > >> > > > >> > challenge
>>> > >> > > > >> > > > is
>>> > >> > > > >> > > > >> that the two have slightly different data. For
>>> > example,
>>> > >> > only
>>> > >> > > > the
>>> > >> > > > >> > > latter
>>> > >> > > > >> > > > >> has
>>> > >> > > > >> > > > >> all endpoints.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > I am not sure whether this is a problem. Could you
>>> > >> explain a
>>> > >> > > bit
>>> > >> > > > >> more
>>> > >> > > > >> > > > what
>>> > >> > > > >> > > > > specific problem this small window can cause?
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Since client can fetch metadata from any broker in
>>> the
>>> > >> > > cluster,
>>> > >> > > > >> and
>>> > >> > > > >> > > given
>>> > >> > > > >> > > > > that different brokers receive request (e.g.
>>> > >> > > LeaderAndIsrRequest
>>> > >> > > > >> and
>>> > >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the
>>> metadata
>>> > >> > > received
>>> > >> > > > >> by
>>> > >> > > > >> > > > client
>>> > >> > > > >> > > > > can be in arbitrary order (either newer or older)
>>> > >> compared
>>> > >> > to
>>> > >> > > > the
>>> > >> > > > >> > > > broker's
>>> > >> > > > >> > > > > leadership state even if a given broker receives
>>> > >> > > > >> LeaderAndIsrRequest
>>> > >> > > > >> > > and
>>> > >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not
>>> sure
>>> > >> it is
>>> > >> > > > >> useful
>>> > >> > > > >> > to
>>> > >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset
>>> commit:
>>> > We
>>> > >> > > allow
>>> > >> > > > a
>>> > >> > > > >> > > > consumer
>>> > >> > > > >> > > > >> to set an arbitrary offset. So it's possible for
>>> > >> offsets or
>>> > >> > > > >> leader
>>> > >> > > > >> > > epoch
>>> > >> > > > >> > > > >> to
>>> > >> > > > >> > > > >> go backwards. I am not sure if we could always
>>> enforce
>>> > >> that
>>> > >> > > the
>>> > >> > > > >> > leader
>>> > >> > > > >> > > > >> epoch only goes up on the broker.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Sure. I have removed this check from the KIP.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > BTW, we can probably still ensure that the
>>> leader_epoch
>>> > >> > always
>>> > >> > > > >> > increase
>>> > >> > > > >> > > > if
>>> > >> > > > >> > > > > the leader_epoch used with offset commit is the
>>> > >> > > max(leader_epoch
>>> > >> > > > >> of
>>> > >> > > > >> > the
>>> > >> > > > >> > > > > message with offset = the committed offset - 1, the
>>> > >> largest
>>> > >> > > > known
>>> > >> > > > >> > > > > leader_epoch from the metadata). But I don't have a
>>> > good
>>> > >> > > > use-case
>>> > >> > > > >> for
>>> > >> > > > >> > > > this
>>> > >> > > > >> > > > > alternative definition. So I choose the keep the
>>> KIP
>>> > >> simple
>>> > >> > by
>>> > >> > > > >> > > requiring
>>> > >> > > > >> > > > > leader_epoch to always increase.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> 65. Good point on handling missing partition
>>> epoch due
>>> > >> to
>>> > >> > > topic
>>> > >> > > > >> > > > deletion.
>>> > >> > > > >> > > > >> Another potential way to address this is to
>>> > additionally
>>> > >> > > > >> propagate
>>> > >> > > > >> > the
>>> > >> > > > >> > > > >> global partition epoch to brokers and the clients.
>>> > This
>>> > >> > way,
>>> > >> > > > >> when a
>>> > >> > > > >> > > > >> partition epoch is missing, we can use the global
>>> > >> partition
>>> > >> > > > >> epoch to
>>> > >> > > > >> > > > >> reason
>>> > >> > > > >> > > > >> about which metadata is more recent.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > This is a great idea. The global epoch can be used
>>> to
>>> > >> order
>>> > >> > > the
>>> > >> > > > >> > > metadata
>>> > >> > > > >> > > > > and help us recognize the more recent metadata if a
>>> > topic
>>> > >> > (or
>>> > >> > > > >> > > partition)
>>> > >> > > > >> > > > is
>>> > >> > > > >> > > > > deleted and re-created.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Actually, it seems we only need to propagate the
>>> global
>>> > >> > epoch
>>> > >> > > to
>>> > >> > > > >> > > brokers
>>> > >> > > > >> > > > > and clients without propagating this epoch on a
>>> > >> per-topic or
>>> > >> > > > >> > > > per-partition
>>> > >> > > > >> > > > > basic. Doing so would simply interface changes made
>>> > this
>>> > >> > KIP.
>>> > >> > > > Does
>>> > >> > > > >> > this
>>> > >> > > > >> > > > > approach sound reasonable?
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> 66. A client may also get an offset by time using
>>> the
>>> > >> > > > >> > offsetForTimes()
>>> > >> > > > >> > > > >> api.
>>> > >> > > > >> > > > >> So, we probably want to include
>>> offsetInternalMetadata
>>> > >> in
>>> > >> > > > >> > > > >> OffsetAndTimestamp
>>> > >> > > > >> > > > >> as well.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > You are right. This probably also requires us to
>>> change
>>> > >> the
>>> > >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP
>>> after
>>> > we
>>> > >> > > agree
>>> > >> > > > on
>>> > >> > > > >> > the
>>> > >> > > > >> > > > > solution for 65).
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with
>>> the
>>> > >> > metadata
>>> > >> > > > >> field
>>> > >> > > > >> > > > already
>>> > >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It
>>> > >> might be
>>> > >> > > > >> useful
>>> > >> > > > >> > to
>>> > >> > > > >> > > > make
>>> > >> > > > >> > > > >> OffsetEpoch printable at least for debugging
>>> purpose.
>>> > >> Once
>>> > >> > > you
>>> > >> > > > do
>>> > >> > > > >> > > that,
>>> > >> > > > >> > > > we
>>> > >> > > > >> > > > >> are already exposing the internal fields. So, not
>>> sure
>>> > >> if
>>> > >> > > it's
>>> > >> > > > >> worth
>>> > >> > > > >> > > > >> hiding
>>> > >> > > > >> > > > >> them. If we do want to hide them, perhaps we can
>>> have
>>> > >> sth
>>> > >> > > like
>>> > >> > > > >> the
>>> > >> > > > >> > > > >> following. The binary encoding is probably more
>>> > >> efficient
>>> > >> > > than
>>> > >> > > > >> JSON
>>> > >> > > > >> > > for
>>> > >> > > > >> > > > >> external storage.
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> OffsetEpoch {
>>> > >> > > > >> > > > >>  static OffsetEpoch decode(byte[]);
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >>   public byte[] encode();
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >>   public String toString();
>>> > >> > > > >> > > > >> }
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > > Thanks much. I like this solution. I have updated
>>> the
>>> > KIP
>>> > >> > > > >> > accordingly.
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> Jun
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <
>>> > >> > > lindong28@gmail.com>
>>> > >> > > > >> > wrote:
>>> > >> > > > >> > > > >>
>>> > >> > > > >> > > > >> > Hey Jason,
>>> > >> > > > >> > > > >> >
>>> > >> > > > >> > > > >> > Certainly. This sounds good. I have updated the
>>> KIP
>>> > to
>>> > >> > > > clarity
>>> > >> > > > >> > that
>>> > >> > > > >> > > > the
>>> > >> > > > >> > > > >> > global epoch will be incremented by 1 each time
>>> a
>>> > >> topic
>>> > >> > is
>>> > >> > > > >> > deleted.
>>> > >> > > > >> > > > >> >
>>> > >> > > > >> > > > >> > Thanks,
>>> > >> > > > >> > > > >> > Dong
>>> > >> > > > >> > > > >> >
>>> > >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson
>>> <
>>> > >> > > > >> > jason@confluent.io
>>> > >> > > > >> > > >
>>> > >> > > > >> > > > >> > wrote:
>>> > >> > > > >> > > > >> >
>>> > >> > > > >> > > > >> > > Hi Dong,
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > I think your approach will allow user to
>>> > distinguish
>>> > >> > > > between
>>> > >> > > > >> the
>>> > >> > > > >> > > > >> metadata
>>> > >> > > > >> > > > >> > > > before and after the topic deletion. I also
>>> > agree
>>> > >> > that
>>> > >> > > > this
>>> > >> > > > >> > can
>>> > >> > > > >> > > be
>>> > >> > > > >> > > > >> > > > potentially be useful to user. I am just not
>>> > very
>>> > >> > sure
>>> > >> > > > >> whether
>>> > >> > > > >> > > we
>>> > >> > > > >> > > > >> > already
>>> > >> > > > >> > > > >> > > > have a good use-case to make the additional
>>> > >> > complexity
>>> > >> > > > >> > > worthwhile.
>>> > >> > > > >> > > > >> It
>>> > >> > > > >> > > > >> > > seems
>>> > >> > > > >> > > > >> > > > that this feature is kind of independent of
>>> the
>>> > >> main
>>> > >> > > > >> problem
>>> > >> > > > >> > of
>>> > >> > > > >> > > > this
>>> > >> > > > >> > > > >> > KIP.
>>> > >> > > > >> > > > >> > > > Could we add this as a future work?
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic
>>> epoch
>>> > on
>>> > >> > > > deletion
>>> > >> > > > >> > and
>>> > >> > > > >> > > > >> leave
>>> > >> > > > >> > > > >> > > propagation of the epoch for deleted topics
>>> for
>>> > >> future
>>> > >> > > > work?
>>> > >> > > > >> I
>>> > >> > > > >> > > don't
>>> > >> > > > >> > > > >> > think
>>> > >> > > > >> > > > >> > > this adds much complexity and it makes the
>>> > behavior
>>> > >> > > > >> consistent:
>>> > >> > > > >> > > > every
>>> > >> > > > >> > > > >> > topic
>>> > >> > > > >> > > > >> > > mutation results in an epoch bump.
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > Thanks,
>>> > >> > > > >> > > > >> > > Jason
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <
>>> > >> > > > >> lindong28@gmail.com>
>>> > >> > > > >> > > > wrote:
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > > Hey Ismael,
>>> > >> > > > >> > > > >> > > >
>>> > >> > > > >> > > > >> > > > I guess we actually need user to see this
>>> field
>>> > so
>>> > >> > that
>>> > >> > > > >> user
>>> > >> > > > >> > can
>>> > >> > > > >> > > > >> store
>>> > >> > > > >> > > > >> > > this
>>> > >> > > > >> > > > >> > > > value in the external store together with
>>> the
>>> > >> offset.
>>> > >> > > We
>>> > >> > > > >> just
>>> > >> > > > >> > > > prefer
>>> > >> > > > >> > > > >> > the
>>> > >> > > > >> > > > >> > > > value to be opaque to discourage most users
>>> from
>>> > >> > > > >> interpreting
>>> > >> > > > >> > > this
>>> > >> > > > >> > > > >> > value.
>>> > >> > > > >> > > > >> > > > One more advantage of using such an opaque
>>> field
>>> > >> is
>>> > >> > to
>>> > >> > > be
>>> > >> > > > >> able
>>> > >> > > > >> > > to
>>> > >> > > > >> > > > >> > evolve
>>> > >> > > > >> > > > >> > > > the information (or schema) of this value
>>> > without
>>> > >> > > > changing
>>> > >> > > > >> > > > consumer
>>> > >> > > > >> > > > >> API
>>> > >> > > > >> > > > >> > > in
>>> > >> > > > >> > > > >> > > > the future.
>>> > >> > > > >> > > > >> > > >
>>> > >> > > > >> > > > >> > > > I also thinking it is probably OK for user
>>> to be
>>> > >> able
>>> > >> > > to
>>> > >> > > > >> > > interpret
>>> > >> > > > >> > > > >> this
>>> > >> > > > >> > > > >> > > > value, particularly for those advanced
>>> users.
>>> > >> > > > >> > > > >> > > >
>>> > >> > > > >> > > > >> > > > Thanks,
>>> > >> > > > >> > > > >> > > > Dong
>>> > >> > > > >> > > > >> > > >
>>> > >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma
>>> <
>>> > >> > > > >> > ismael@juma.me.uk>
>>> > >> > > > >> > > > >> wrote:
>>> > >> > > > >> > > > >> > > >
>>> > >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason
>>> > Gustafson
>>> > >> <
>>> > >> > > > >> > > > >> jason@confluent.io>
>>> > >> > > > >> > > > >> > > > > wrote:
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > class OffsetAndMetadata {
>>> > >> > > > >> > > > >> > > > > >   long offset;
>>> > >> > > > >> > > > >> > > > > >   byte[] offsetMetadata;
>>> > >> > > > >> > > > >> > > > > >   String metadata;
>>> > >> > > > >> > > > >> > > > > > }
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit
>>> annoying,
>>> > but
>>> > >> we
>>> > >> > > can
>>> > >> > > > >> > > probably
>>> > >> > > > >> > > > >> come
>>> > >> > > > >> > > > >> > up
>>> > >> > > > >> > > > >> > > > > with
>>> > >> > > > >> > > > >> > > > > > something better. Internally the byte
>>> array
>>> > >> would
>>> > >> > > > have
>>> > >> > > > >> a
>>> > >> > > > >> > > > >> version.
>>> > >> > > > >> > > > >> > If
>>> > >> > > > >> > > > >> > > in
>>> > >> > > > >> > > > >> > > > > the
>>> > >> > > > >> > > > >> > > > > > future we have anything else we need to
>>> add,
>>> > >> we
>>> > >> > can
>>> > >> > > > >> update
>>> > >> > > > >> > > the
>>> > >> > > > >> > > > >> > > version
>>> > >> > > > >> > > > >> > > > > and
>>> > >> > > > >> > > > >> > > > > > we wouldn't need any new APIs.
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > > We can also add fields to a class in a
>>> > >> compatible
>>> > >> > > way.
>>> > >> > > > >> So,
>>> > >> > > > >> > it
>>> > >> > > > >> > > > >> seems
>>> > >> > > > >> > > > >> > to
>>> > >> > > > >> > > > >> > > me
>>> > >> > > > >> > > > >> > > > > that the main advantage of the byte array
>>> is
>>> > >> that
>>> > >> > > it's
>>> > >> > > > >> > opaque
>>> > >> > > > >> > > to
>>> > >> > > > >> > > > >> the
>>> > >> > > > >> > > > >> > > > user.
>>> > >> > > > >> > > > >> > > > > Is that correct? If so, we could also add
>>> any
>>> > >> > opaque
>>> > >> > > > >> > metadata
>>> > >> > > > >> > > > in a
>>> > >> > > > >> > > > >> > > > subclass
>>> > >> > > > >> > > > >> > > > > so that users don't even see it (unless
>>> they
>>> > >> cast
>>> > >> > it,
>>> > >> > > > but
>>> > >> > > > >> > then
>>> > >> > > > >> > > > >> > they're
>>> > >> > > > >> > > > >> > > on
>>> > >> > > > >> > > > >> > > > > their own).
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > > Ismael
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > > The corresponding seek() and position()
>>> APIs
>>> > >> might
>>> > >> > > look
>>> > >> > > > >> > > > something
>>> > >> > > > >> > > > >> > like
>>> > >> > > > >> > > > >> > > > > this:
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long
>>> > >> offset,
>>> > >> > > > byte[]
>>> > >> > > > >> > > > >> > > > offsetMetadata);
>>> > >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition
>>> > >> > partition);
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > What do you think?
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > Thanks,
>>> > >> > > > >> > > > >> > > > > > Jason
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong
>>> Lin <
>>> > >> > > > >> > > lindong28@gmail.com
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > wrote:
>>> > >> > > > >> > > > >> > > > > >
>>> > >> > > > >> > > > >> > > > > > > Hey Jun, Jason,
>>> > >> > > > >> > > > >> > > > > > >
>>> > >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I
>>> have
>>> > >> > updated
>>> > >> > > > the
>>> > >> > > > >> KIP
>>> > >> > > > >> > > > >> based on
>>> > >> > > > >> > > > >> > > the
>>> > >> > > > >> > > > >> > > > > > > latest discussion. Can you help check
>>> > >> whether
>>> > >> > it
>>> > >> > > > >> looks
>>> > >> > > > >> > > good?
>>> > >> > > > >> > > > >> > > > > > >
>>> > >> > > > >> > > > >> > > > > > > Thanks,
>>> > >> > > > >> > > > >> > > > > > > Dong
>>> > >> > > > >> > > > >> > > > > > >
>>> > >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong
>>> Lin <
>>> > >> > > > >> > > > lindong28@gmail.com
>>> > >> > > > >> > > > >> >
>>> > >> > > > >> > > > >> > > > wrote:
>>> > >> > > > >> > > > >> > > > > > >
>>> > >> > > > >> > > > >> > > > > > > > Hey Jun,
>>> > >> > > > >> > > > >> > > > > > > >
>>> > >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I
>>> am
>>> > not
>>> > >> > sure
>>> > >> > > > that
>>> > >> > > > >> > the
>>> > >> > > > >> > > > >> > proposed
>>> > >> > > > >> > > > >> > > > API
>>> > >> > > > >> > > > >> > > > > is
>>> > >> > > > >> > > > >> > > > > > > > sufficient. For users that store
>>> offset
>>> > >> > > > >> externally, we
>>> > >> > > > >> > > > >> probably
>>> > >> > > > >> > > > >> > > > need
>>> > >> > > > >> > > > >> > > > > > > extra
>>> > >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and
>>> > >> > > > partition_epoch
>>> > >> > > > >> for
>>> > >> > > > >> > > all
>>> > >> > > > >> > > > >> > > > partitions
>>> > >> > > > >> > > > >> > > > > > > that
>>> > >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose
>>> these
>>> > >> > users
>>> > >> > > > >> > currently
>>> > >> > > > >> > > > use
>>> > >> > > > >> > > > >> > > > > position()
>>> > >> > > > >> > > > >> > > > > > > to
>>> > >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably
>>> need a
>>> > >> new
>>> > >> > > > method
>>> > >> > > > >> > > > >> > > > > > positionWithEpoch(..)
>>> > >> > > > >> > > > >> > > > > > > to
>>> > >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch,
>>> > >> > leader_epoch>.
>>> > >> > > > >> Does
>>> > >> > > > >> > > this
>>> > >> > > > >> > > > >> sound
>>> > >> > > > >> > > > >> > > > > > > reasonable?
>>> > >> > > > >> > > > >> > > > > > > >
>>> > >> > > > >> > > > >> > > > > > > > Thanks,
>>> > >> > > > >> > > > >> > > > > > > > Dong
>>> > >> > > > >> > > > >> > > > > > > >
>>> > >> > > > >> > > > >> > > > > > > >
>>> > >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun
>>> Rao
>>> > <
>>> > >> > > > >> > > jun@confluent.io
>>> > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > wrote:
>>> > >> > > > >> > > > >> > > > > > > >
>>> > >> > > > >> > > > >> > > > > > > >> Hi, Dong,
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking.
>>> > >> OffsetEpoch
>>> > >> > > will
>>> > >> > > > >> be
>>> > >> > > > >> > > > >> composed
>>> > >> > > > >> > > > >> > of
>>> > >> > > > >> > > > >> > > > > > > >> (partition_epoch,
>>> > >> > > > >> > > > >> > > > > > > >> leader_epoch).
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >> Thanks,
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >> Jun
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM,
>>> Dong
>>> > Lin
>>> > >> <
>>> > >> > > > >> > > > >> lindong28@gmail.com
>>> > >> > > > >> > > > >> > >
>>> > >> > > > >> > > > >> > > > > wrote:
>>> > >> > > > >> > > > >> > > > > > > >>
>>> > >> > > > >> > > > >> > > > > > > >> > Hey Jun,
>>> > >> > > > >> > > > >> > > > > > > >> >
>>> > >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new
>>> API
>>> > >> that
>>> > >> > you
>>> > >> > > > >> > > proposed.
>>> > >> > > > >> > > > I
>>> > >> > > > >> > > > >> am
>>> > >> > > > >> > > > >> > > not
>>> > >> > > > >> > > > >> > > > > sure
>>> > >> > > > >> > > > >> > > > > > > >> what
>>> > >> > > > >> > > > >> > > > > > > >> > you exactly mean by
>>> offset_epoch. I
>>> > >> > suppose
>>> > >> > > > >> that we
>>> > >> > > > >> > > can
>>> > >> > > > >> > > > >> use
>>> > >> > > > >> > > > >> > > the
>>> > >> > > > >> > > > >> > > > > pair
>>> > >> > > > >> > > > >> > > > > > > of
>>> > >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch)
>>> as
>>> > the
>>> > >> > > > >> > offset_epoch,
>>> > >> > > > >> > > > >> right?
>>> > >> > > > >> > > > >> > > > > > > >> >
>>> > >> > > > >> > > > >> > > > > > > >> > Thanks,
>>> > >> > > > >> > > > >> > > > > > > >> > Dong
>>> > >> > > > >> > > > >> > > > > > > >> >
>>> > >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM,
>>> Jun
>>> > >> Rao <
>>> > >> > > > >> > > > >> jun@confluent.io>
>>> > >> > > > >> > > > >> > > > wrote:
>>> > >> > > > >> > > > >> > > > > > > >> >
>>> > >> > > > >> > > > >> > > > > > > >> > > Hi, Dong,
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you
>>> proposed
>>> > >> works.
>>> > >> > > The
>>> > >> > > > >> > > question
>>> > >> > > > >> > > > >> is
>>> > >> > > > >> > > > >> > > > whether
>>> > >> > > > >> > > > >> > > > > > > >> that's
>>> > >> > > > >> > > > >> > > > > > > >> > the
>>> > >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the
>>> > long
>>> > >> > term.
>>> > >> > > > My
>>> > >> > > > >> > > concern
>>> > >> > > > >> > > > >> is
>>> > >> > > > >> > > > >> > > that
>>> > >> > > > >> > > > >> > > > > > while
>>> > >> > > > >> > > > >> > > > > > > >> the
>>> > >> > > > >> > > > >> > > > > > > >> > api
>>> > >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api
>>> seems
>>> > >> > harder
>>> > >> > > > to
>>> > >> > > > >> > > explain
>>> > >> > > > >> > > > >> and
>>> > >> > > > >> > > > >> > > use.
>>> > >> > > > >> > > > >> > > > > For
>>> > >> > > > >> > > > >> > > > > > > >> > example,
>>> > >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets
>>> > externally
>>> > >> > now
>>> > >> > > > >> needs
>>> > >> > > > >> > to
>>> > >> > > > >> > > > call
>>> > >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after
>>> > calling
>>> > >> > > > seek().
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to
>>> make
>>> > >> the
>>> > >> > > > >> following
>>> > >> > > > >> > > > >> > compatible
>>> > >> > > > >> > > > >> > > > api
>>> > >> > > > >> > > > >> > > > > > > >> changes
>>> > >> > > > >> > > > >> > > > > > > >> > in
>>> > >> > > > >> > > > >> > > > > > > >> > > Consumer.
>>> > >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch
>>> > >> field in
>>> > >> > > > >> > > > >> > OffsetAndMetadata.
>>> > >> > > > >> > > > >> > > > (no
>>> > >> > > > >> > > > >> > > > > > need
>>> > >> > > > >> > > > >> > > > > > > >> to
>>> > >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api)
>>> > >> > > > >> > > > >> > > > > > > >> > > * Add a new api
>>> seek(TopicPartition
>>> > >> > > > partition,
>>> > >> > > > >> > long
>>> > >> > > > >> > > > >> > offset,
>>> > >> > > > >> > > > >> > > > > > > >> OffsetEpoch
>>> > >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can
>>> potentially
>>> > >> > deprecate
>>> > >> > > > the
>>> > >> > > > >> > old
>>> > >> > > > >> > > > api
>>> > >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition
>>> > >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the
>>> > >> future.
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has
>>> > similar
>>> > >> > > amount
>>> > >> > > > of
>>> > >> > > > >> > api
>>> > >> > > > >> > > > >> changes
>>> > >> > > > >> > > > >> > > as
>>> > >> > > > >> > > > >> > > > > > yours
>>> > >> > > > >> > > > >> > > > > > > >> but
>>> > >> > > > >> > > > >> > > > > > > >> > has
>>> > >> > > > >> > > > >> > > > > > > >> > > the following benefits.
>>> > >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar
>>> way
>>> > as
>>> > >> how
>>> > >> > > > >> offset
>>> > >> > > > >> > > > >> management
>>> > >> > > > >> > > > >> > > > works
>>> > >> > > > >> > > > >> > > > > > now
>>> > >> > > > >> > > > >> > > > > > > >> and
>>> > >> > > > >> > > > >> > > > > > > >> > is
>>> > >> > > > >> > > > >> > > > > > > >> > > probably what we want in the
>>> long
>>> > >> term.
>>> > >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better
>>> when
>>> > >> > there
>>> > >> > > is
>>> > >> > > > >> data
>>> > >> > > > >> > > > loss
>>> > >> > > > >> > > > >> due
>>> > >> > > > >> > > > >> > > to
>>> > >> > > > >> > > > >> > > > > > > unclean
>>> > >> > > > >> > > > >> > > > > > > >> > > leader election or correlated
>>> > replica
>>> > >> > > > failure.
>>> > >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better
>>> when
>>> > >> > topic
>>> > >> > > is
>>> > >> > > > >> > > > recreated.
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > Thanks,
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > Jun
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM,
>>> > Dong
>>> > >> > Lin <
>>> > >> > > > >> > > > >> > > lindong28@gmail.com
>>> > >> > > > >> > > > >> > > > >
>>> > >> > > > >> > > > >> > > > > > > wrote:
>>> > >> > > > >> > > > >> > > > > > > >> > >
>>> > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun,
>>> > >> > > > >> > > > >> > > > > > > >> > > >
>>> > >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we
>>> > don't
>>> > >> > want
>>> > >> > > an
>>> > >> > > > >> ever
>>> > >> > > > >> > > > >> growing
>>> > >> > > > >> > > > >> > > > global
>>> > >> > > > >> > > > >> > > > > > > >> metadata
>>> > >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may
>>> be
>>> > >> more
>>> > >> > > > >> desirable
>>> > >> > > > >> > to
>>> > >> > > > >> > > > >> keep
>>> > >> > > > >> > > > >> > the
>>> > >> > > > >> > > > >> > > > > > > consumer
>>> > >> > > > >> > > > >> > > > > > > >> API
>>> > >> > > > >> > > > >> > > > > > > >> > > > simple.
>>> > >> > > > >> > > > >> > > > > > > >> > > >
>>> > >> > > > >> > > > >> > > > > > > >> > > > In my current proposal,
>>> metadata
>>> > >> > version
>>> > >> > > > >> > returned
>>> > >> > > > >> > > > in
>>> > >> > > > >> > > > >> the
>>> > >> > > > >> > > > >> > > > fetch
>>> > >> > > > >> > > > >> > > > > > > >> response
>>> > >> > > > >> > > > >> > > > > > > >> > > > will be stored with the
>>> offset
>>> > >> > together.
>>> > >> > > > >> More
>>> > >> > > > >> > > > >> > > specifically,
>>> > >> > > > >> > > > >> > > > > the
>>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new
>>> offset
>>> > >> topic
>>> > >> > > > >> schema
>>> > >> > > > >> > > will
>>> > >> > > > >> > > > be
>>> > >> > > > >> > > > >> > the
>>> > >> > > > >> > > > >> > > > > > largest
>>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the
>>> > >> > > > MetadataResponse
>>> > >> > > > >> > and
>>> > >> > > > >> > > > >> > > > FetchResponse
>>> > >> > > > >> > > > >> > > > > > > ever
>>> > >> > > > >> > > > >> > > > > > > >> > > > received by this consumer.
>>> > >> > > > >> > > > >> > > > > > > >> > > >
>>> > >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to
>>> change
>>> > >> the
>>> > >> > > > >> consumer
>>> > >> > > > >> > API
>>> > >> > > > >> > > > for
>>> > >> > > > >> > > > >> > > > > > > >> > > >
>>> commitSync(Map<TopicPartition,
>>> > >> > > > >> > > OffsetAndMetadata>).
>>> > >> > > > >> > > > >> If
>>> > >> > > > >> > > > >> > > user
>>> > >> > > > >> > > > >> > > > > > calls
>>> > >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit
>>> offset
>>> > 10
>>> > >> > for
>>> > >> > > a
>>> > >> > > > >> given
>>> > >> > > > >> > > > >> > partition,
>>> > >> > > > >> > > > >> > > > for
>>> > >> > > > >> > > > >> > > > > > > most
>>> > >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer
>>> instance
>>> > >> > should
>>> > >> > > > >> have
>>> > >> > > > >> > > > >> consumed
>>> > >> > > > >> > > > >> > > > message
>>> > >> > > > >> > > > >> > > > > > > with
>>> > >> > > > >> > > > >> > > > > > > >> > > offset
>>> > >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in
>>> which
>>> > >> case
>>> > >> > the
>>> > >> > > > >> > consumer
>>> > >> > > > >> > > > can
>>> > >> > > > >> > > > >> > > > remember
>>> > >> > > > >> > > > >> > > > > > and
>>> > >
>>>
>>