You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jungtaek Lim <ka...@gmail.com> on 2019/08/09 03:46:41 UTC

[DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Hi devs,

I'd like to initiate discussion around KIP-505, exposing new public method
to only update assignment metadata in consumer.

`poll(0)` has been misused as according to Kafka doc it doesn't guarantee
that it doesn't pull any records, and new method `poll(Duration)` doesn't
have same semantic, so would like to propose new public API which only does
the desired behavior.

KIP page: https://cwiki.apache.org/confluence/x/z5NiBw

Please feel free to suggest any improvements on proposal, as I'm new to
Kafka community and may not catch preferences (like TimeoutException vs
boolean, etc.) on Kafka project.

Thanks in advance!
Jungtaek Lim (HeartSaVioR)

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Colin,

Thanks for your suggestion! Which KIPs are you referring to?

BR,
G


On Mon, Aug 12, 2019 at 5:22 PM Colin McCabe <cm...@apache.org> wrote:

> Hi,
>
> If there’s no need to consume records in the Spark driver, then the
> Consumer is probably the wrong thing to use. Instead, Spark should use
> AdminClient to find out what partitions exist and where, manage their
> offsets, and so on. There are some KIPs under discussion now that would add
> the necessary APIs for managing offsets.
>
> Best,
> Colin
>
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > My feeling is that I didn't explain the use case for Spark properly and
> > hence fail to explain the needs. Sorry about this.
> >
> > Spark leverages the single instance of KafkaConsumer in the driver which
> is
> > registered solely on the consumer group. This is used in the plan phase
> for
> > each micro-batch to calculate the overall topicpartitions with its offset
> > ranges for this batch, and split and assign (topicpartition, fromOffset,
> > untilOffset) to each input partition. After the planning is done and
> tasks
> > are being distributed to executors, consumer per each input partition
> will
> > be initialized from some executor (being assigned to the single
> > topicpartition), and pull the actual records. (Pooling consumers is
> applied
> > for sure.) As plan phase is to determine the overall topicpartitions and
> > offset ranges to process, Spark is never interested on pulling the
> records
> > in driver side.
> >
> > Spark mainly leverages poll(0) to get the latest assigned partitions and
> > adopt the changes or validate the expectation. That's not only use case
> for
> > poll(0). Spark is also seeking the offset per topicpartition to the
> > earliest or the latest, or specific one (either provided by end user or
> the
> > last committed offset) so that Spark can have actual offset or validate
> the
> > provided offset. According to the javadoc (if I understand correctly), to
> > get the offset immediately it seems to be required to call `poll` or
> > `position`.
> >
> > The way Spark interacts with Kafka in this plan phase in driver is
> > synchronous, as the phase should finish ASAP to run the next phase.
> > Registering ConsumerRebalanceListener and tracking the change will
> require
> > some asynchronous handling which sounds to add unnecessary complexity.
> > Spark may be OK with deal with synchronous with timeout (that's what
> > methods in KafkaConsumer have been providing - they're not asynchronous,
> at
> > least for callers) but dealing with asynchronous is another level of
> > interest. I can see the benefit where continuous thread runs and the
> > consumer is busy with something continuously, relying on listener to hear
> > the news on reassignment. Unfortunately that's not the case.
> >
> > Unit tests in Spark have similar needs: looks like Kafka test code also
> > leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
> > as it's appropriate to the place which blocking (+timeout) call is
> > preferred - so I can see the similar needs from here as well.
> >
> > On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <gabor.g.somogyi@gmail.com
> >
> > wrote:
> >
> > > Hi Guys,
> > >
> > > Please see the actual implementation, pretty sure it explains the
> situation
> > > well:
> > >
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > >
> > > To answer one question/assumption which popped up from all of you
> Spark not
> > > only uses KafkaConsumer#subscribe but pattern subscribe +
> > > KafkaConsumer#assign as well.
> > > Please see here:
> > >
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > >
> > > BR,
> > > G
> > >
> > >
> > > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> satish.duggana@gmail.com>
> > > wrote:
> > >
> > > > Hi Jungtaek,
> > > > Thanks for the KIP. I have a couple of questions here.
> > > > Is not Spark using Kafka's consumer group management across multiple
> > > > consumers?
> > > >
> > > > Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > > > ConsumerRebalanceListener listener) only to get all the topics for a
> > > > pattern based subscription and Spark manually assigns those
> > > > topic-partitions across consumers on workers?
> > > >
> > > > Thanks,
> > > > Satish.
> > > >
> > > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> matthias@confluent.io>
> > > > wrote:
> > > >
> > > > > If am not sure if I fully understand yet.
> > > > >
> > > > > The fact, that Spark does not stores offsets in Kafka but as part
> of
> > > its
> > > > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > > > > something here.
> > > > >
> > > > > As you are using subscribe(), you use Kafka consumer group
> mechanism,
> > > > > that takes care of the assignment of partitions to clients within
> the
> > > > > group. Therefore, I am not sure what you mean by:
> > > > >
> > > > > > which Spark needs to
> > > > > >> know to coordinate multiple consumers to pull correctly.
> > > > >
> > > > > Multiple thoughts that may help:
> > > > >
> > > > > - if Spark needs more control about the partition assignment, you
> can
> > > > > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > > > configuration)
> > > > >
> > > > > - you may also want to register `ConsumerRebalanceListener` via
> > > > > `subscribe()` to get informed when the group rebalances
> > > > >
> > > > > As you pointed out, using pattern subscription metadata can change
> if
> > > > > topic are added/deleted. However, each metadata change will
> triggering
> > > a
> > > > > rebalance and thus you would get corresponding calls to you
> rebalance
> > > > > listener to learn about it and react accordingly.
> > > > >
> > > > > Maybe you can explain why neither of both approaches works and
> what gap
> > > > > the new API would close?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > > > > Let me elaborate my explanation a bit more. Here we say about
> Apache
> > > > > Spark,
> > > > > > but this will apply for everything which want to control offset
> of
> > > > Kafka
> > > > > > consumers.
> > > > > >
> > > > > > Spark is managing the committed offsets and the offsets which
> should
> > > be
> > > > > > polled now. Topics and partitions as well. This is required as
> Spark
> > > > > itself
> > > > > > has its own general checkpoint mechanism and Kafka is just a one
> of
> > > > > > source/sink (though it's considered as very important).
> > > > > >
> > > > > > To pull records from Kafka, Spark provides to Kafka which topics
> and
> > > > > > partitions it wants to subscribe(, and do seek and poll), but as
> > > Spark
> > > > > can
> > > > > > also provide "patterns" of topics, as well as subscription can be
> > > > changed
> > > > > > in Kafka side (topic added/dropped, partitions added) which Spark
> > > needs
> > > > > to
> > > > > > know to coordinate multiple consumers to pull correctly.
> > > > > >
> > > > > > Looks like assignment() doesn't update the assignment
> information in
> > > > > > consumer. It just returns known one. There's only one known
> approach
> > > > > doing
> > > > > > this, calling `poll`, but Spark is not interested on returned
> > > records,
> > > > so
> > > > > > there's a need for a hack `poll(0)`, and Kafka deprecated the
> API.
> > > This
> > > > > KIP
> > > > > > proposes to support this as official approach.
> > > > > >
> > > > > >
> > > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Sorry I didn't recognize you're also asking it here as well.
> I'm in
> > > > > favor
> > > > > >> of describing it in this discussion thread so the discussion
> itself
> > > > can
> > > > > go
> > > > > >> forward. So copying my answer here:
> > > > > >>
> > > > > >> We have some use case which we don't just rely on everything
> what
> > > > Kafka
> > > > > >> consumer provides. We want to know current assignment on this
> > > > consumer,
> > > > > and
> > > > > >> to get the latest assignment, we called the hack `poll(0)`.
> > > > > >>
> > > > > >> That said, we don't want to pull any records here, and if I'm
> not
> > > > > missing
> > > > > >> here, there's no way to accomplish this. Please guide me if I'm
> > > > missing
> > > > > >> something.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Jungtaek Lim (HeartSaVioR)
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > > > matthias@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Thanks for the KIP.
> > > > > >>>
> > > > > >>> Can you elaborate a little bit more on the use case for this
> > > feature?
> > > > > >>> Why would a consumer need to update it's metadata explicitly?
> > > > > >>>
> > > > > >>>
> > > > > >>> -Matthias
> > > > > >>>
> > > > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > > > >>>> Hi devs,
> > > > > >>>>
> > > > > >>>> I'd like to initiate discussion around KIP-505, exposing new
> > > public
> > > > > >>> method
> > > > > >>>> to only update assignment metadata in consumer.
> > > > > >>>>
> > > > > >>>> `poll(0)` has been misused as according to Kafka doc it
> doesn't
> > > > > >>> guarantee
> > > > > >>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> > > > > >>> doesn't
> > > > > >>>> have same semantic, so would like to propose new public API
> which
> > > > only
> > > > > >>> does
> > > > > >>>> the desired behavior.
> > > > > >>>>
> > > > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > > > >>>>
> > > > > >>>> Please feel free to suggest any improvements on proposal, as
> I'm
> > > new
> > > > > to
> > > > > >>>> Kafka community and may not catch preferences (like
> > > TimeoutException
> > > > > vs
> > > > > >>>> boolean, etc.) on Kafka project.
> > > > > >>>>
> > > > > >>>> Thanks in advance!
> > > > > >>>> Jungtaek Lim (HeartSaVioR)
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >> --
> > > > > >> Name : Jungtaek Lim
> > > > > >> Blog : http://medium.com/@heartsavior
> > > > > >> Twitter : http://twitter.com/heartsavior
> > > > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
For now Spark needs to know about exact offset for EARLIEST, LATEST per
partition so that it can handle users' query on EARLIEST/LATEST and write
exact offset in checkpoint. I guess Spark would also want to validate the
known offset, but I guess that could be covered by knowing range of
available offsets. (Not 100% sure, but I imagine.)

I have proposed a PR on Spark side which addressed offset for timestamp, so
it should be retrieved from AdminClient as well if we want to move out of
consumer. For now it's also leveraging consumer as well.

On Tue, Aug 13, 2019 at 7:51 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Note that `KafkaConsumer` refreshed it's metadata every 5 minutes by
> default anyway... (parameter `metadata.max.age.ms`). And of course, you
> can refresh the metadata you get via AdminClient each time you trigger
> planning. I cannot quantify the overhead of a single request though.
>
> Also, what offset information are you interested in? Because you don't
> commit any offsets to Kafka, but store it in Spark's checkpoint, it's
> unclear what information you are looking for?
>
> -Matthias
>
>
> On 8/12/19 2:53 PM, Jungtaek Lim wrote:
> > Thanks for the feedbacks Colin and Matthias.
> >
> > I agree with you regarding getting topics and partitions via AdminClient,
> > just curious how much the overhead would be. Would it be lighter, or
> > heavier? We may not want to list topics in regular intervals - in plan
> > phase we want to know up-to-date information so that the calculation from
> > Spark itself makes sense.
> >
> > On the other hands I'm not seeing any information regarding offset in
> > current AdminClient, which is also one of reason we leverage consumer and
> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> could
> > you refer KIPs so that we can see whether it would work for our case?
> > Without support of this we cannot replace our usage of consumer/poll with
> > AdminClient.
> >
> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> > receives regex same as consumer subscription via pattern. We would like
> to
> > provide same behavior what Kafka is basically providing as a source.
> >
> >
> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Thanks for the details Jungtaek!
> >>
> >> I tend to agree with Colin, that using the AdminClient seems to be the
> >> better choice.
> >>
> >> You can get all topics via `listTopics()` (and you can refresh this
> >> information on regular intervals) and match any pattern against the list
> >> of available topics in the driver.
> >>
> >> As you use `assignment()` and store offsets in the Spark checkpoint, it
> >> seems that using consumer group management is not a good fit for the use
> >> case.
> >>
> >>
> >> Thoughts?
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 8/12/19 8:22 AM, Colin McCabe wrote:
> >>> Hi,
> >>>
> >>> If there’s no need to consume records in the Spark driver, then the
> >> Consumer is probably the wrong thing to use. Instead, Spark should use
> >> AdminClient to find out what partitions exist and where, manage their
> >> offsets, and so on. There are some KIPs under discussion now that would
> add
> >> the necessary APIs for managing offsets.
> >>>
> >>> Best,
> >>> Colin
> >>>
> >>> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >>>> My feeling is that I didn't explain the use case for Spark properly
> and
> >>>> hence fail to explain the needs. Sorry about this.
> >>>>
> >>>> Spark leverages the single instance of KafkaConsumer in the driver
> >> which is
> >>>> registered solely on the consumer group. This is used in the plan
> phase
> >> for
> >>>> each micro-batch to calculate the overall topicpartitions with its
> >> offset
> >>>> ranges for this batch, and split and assign (topicpartition,
> fromOffset,
> >>>> untilOffset) to each input partition. After the planning is done and
> >> tasks
> >>>> are being distributed to executors, consumer per each input partition
> >> will
> >>>> be initialized from some executor (being assigned to the single
> >>>> topicpartition), and pull the actual records. (Pooling consumers is
> >> applied
> >>>> for sure.) As plan phase is to determine the overall topicpartitions
> and
> >>>> offset ranges to process, Spark is never interested on pulling the
> >> records
> >>>> in driver side.
> >>>>
> >>>> Spark mainly leverages poll(0) to get the latest assigned partitions
> and
> >>>> adopt the changes or validate the expectation. That's not only use
> case
> >> for
> >>>> poll(0). Spark is also seeking the offset per topicpartition to the
> >>>> earliest or the latest, or specific one (either provided by end user
> or
> >> the
> >>>> last committed offset) so that Spark can have actual offset or
> validate
> >> the
> >>>> provided offset. According to the javadoc (if I understand correctly),
> >> to
> >>>> get the offset immediately it seems to be required to call `poll` or
> >>>> `position`.
> >>>>
> >>>> The way Spark interacts with Kafka in this plan phase in driver is
> >>>> synchronous, as the phase should finish ASAP to run the next phase.
> >>>> Registering ConsumerRebalanceListener and tracking the change will
> >> require
> >>>> some asynchronous handling which sounds to add unnecessary complexity.
> >>>> Spark may be OK with deal with synchronous with timeout (that's what
> >>>> methods in KafkaConsumer have been providing - they're not
> >> asynchronous, at
> >>>> least for callers) but dealing with asynchronous is another level of
> >>>> interest. I can see the benefit where continuous thread runs and the
> >>>> consumer is busy with something continuously, relying on listener to
> >> hear
> >>>> the news on reassignment. Unfortunately that's not the case.
> >>>>
> >>>> Unit tests in Spark have similar needs: looks like Kafka test code
> also
> >>>> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> >> places
> >>>> as it's appropriate to the place which blocking (+timeout) call is
> >>>> preferred - so I can see the similar needs from here as well.
> >>>>
> >>>> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> gabor.g.somogyi@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Guys,
> >>>>>
> >>>>> Please see the actual implementation, pretty sure it explains the
> >> situation
> >>>>> well:
> >>>>>
> >>>>>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >>>>>
> >>>>> To answer one question/assumption which popped up from all of you
> >> Spark not
> >>>>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >>>>> KafkaConsumer#assign as well.
> >>>>> Please see here:
> >>>>>
> >>>>>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >>>>>
> >>>>> BR,
> >>>>> G
> >>>>>
> >>>>>
> >>>>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> satish.duggana@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jungtaek,
> >>>>>> Thanks for the KIP. I have a couple of questions here.
> >>>>>> Is not Spark using Kafka's consumer group management across multiple
> >>>>>> consumers?
> >>>>>>
> >>>>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >>>>>> ConsumerRebalanceListener listener) only to get all the topics for a
> >>>>>> pattern based subscription and Spark manually assigns those
> >>>>>> topic-partitions across consumers on workers?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Satish.
> >>>>>>
> >>>>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> If am not sure if I fully understand yet.
> >>>>>>>
> >>>>>>> The fact, that Spark does not stores offsets in Kafka but as part
> of
> >>>>> its
> >>>>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> >>>>>>> something here.
> >>>>>>>
> >>>>>>> As you are using subscribe(), you use Kafka consumer group
> mechanism,
> >>>>>>> that takes care of the assignment of partitions to clients within
> the
> >>>>>>> group. Therefore, I am not sure what you mean by:
> >>>>>>>
> >>>>>>>> which Spark needs to
> >>>>>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>>>
> >>>>>>> Multiple thoughts that may help:
> >>>>>>>
> >>>>>>> - if Spark needs more control about the partition assignment, you
> can
> >>>>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> >>>>>>> configuration)
> >>>>>>>
> >>>>>>> - you may also want to register `ConsumerRebalanceListener` via
> >>>>>>> `subscribe()` to get informed when the group rebalances
> >>>>>>>
> >>>>>>> As you pointed out, using pattern subscription metadata can change
> if
> >>>>>>> topic are added/deleted. However, each metadata change will
> >> triggering
> >>>>> a
> >>>>>>> rebalance and thus you would get corresponding calls to you
> rebalance
> >>>>>>> listener to learn about it and react accordingly.
> >>>>>>>
> >>>>>>> Maybe you can explain why neither of both approaches works and what
> >> gap
> >>>>>>> the new API would close?
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >>>>>>>> Let me elaborate my explanation a bit more. Here we say about
> Apache
> >>>>>>> Spark,
> >>>>>>>> but this will apply for everything which want to control offset of
> >>>>>> Kafka
> >>>>>>>> consumers.
> >>>>>>>>
> >>>>>>>> Spark is managing the committed offsets and the offsets which
> should
> >>>>> be
> >>>>>>>> polled now. Topics and partitions as well. This is required as
> Spark
> >>>>>>> itself
> >>>>>>>> has its own general checkpoint mechanism and Kafka is just a one
> of
> >>>>>>>> source/sink (though it's considered as very important).
> >>>>>>>>
> >>>>>>>> To pull records from Kafka, Spark provides to Kafka which topics
> and
> >>>>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> >>>>> Spark
> >>>>>>> can
> >>>>>>>> also provide "patterns" of topics, as well as subscription can be
> >>>>>> changed
> >>>>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
> >>>>> needs
> >>>>>>> to
> >>>>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>>>>
> >>>>>>>> Looks like assignment() doesn't update the assignment information
> in
> >>>>>>>> consumer. It just returns known one. There's only one known
> approach
> >>>>>>> doing
> >>>>>>>> this, calling `poll`, but Spark is not interested on returned
> >>>>> records,
> >>>>>> so
> >>>>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> >>>>> This
> >>>>>>> KIP
> >>>>>>>> proposes to support this as official approach.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm
> in
> >>>>>>> favor
> >>>>>>>>> of describing it in this discussion thread so the discussion
> itself
> >>>>>> can
> >>>>>>> go
> >>>>>>>>> forward. So copying my answer here:
> >>>>>>>>>
> >>>>>>>>> We have some use case which we don't just rely on everything what
> >>>>>> Kafka
> >>>>>>>>> consumer provides. We want to know current assignment on this
> >>>>>> consumer,
> >>>>>>> and
> >>>>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >>>>>>>>>
> >>>>>>>>> That said, we don't want to pull any records here, and if I'm not
> >>>>>>> missing
> >>>>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> >>>>>> missing
> >>>>>>>>> something.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >>>>>> matthias@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>
> >>>>>>>>>> Can you elaborate a little bit more on the use case for this
> >>>>> feature?
> >>>>>>>>>> Why would a consumer need to update it's metadata explicitly?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >>>>>>>>>>> Hi devs,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> >>>>> public
> >>>>>>>>>> method
> >>>>>>>>>>> to only update assignment metadata in consumer.
> >>>>>>>>>>>
> >>>>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> >>>>>>>>>> guarantee
> >>>>>>>>>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> >>>>>>>>>> doesn't
> >>>>>>>>>>> have same semantic, so would like to propose new public API
> which
> >>>>>> only
> >>>>>>>>>> does
> >>>>>>>>>>> the desired behavior.
> >>>>>>>>>>>
> >>>>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >>>>>>>>>>>
> >>>>>>>>>>> Please feel free to suggest any improvements on proposal, as
> I'm
> >>>>> new
> >>>>>>> to
> >>>>>>>>>>> Kafka community and may not catch preferences (like
> >>>>> TimeoutException
> >>>>>>> vs
> >>>>>>>>>>> boolean, etc.) on Kafka project.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks in advance!
> >>>>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Name : Jungtaek Lim
> >>>>>>>>> Blog : http://medium.com/@heartsavior
> >>>>>>>>> Twitter : http://twitter.com/heartsavior
> >>>>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Name : Jungtaek Lim
> >>>> Blog : http://medium.com/@heartsavior
> >>>> Twitter : http://twitter.com/heartsavior
> >>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Note that `KafkaConsumer` refreshed it's metadata every 5 minutes by
default anyway... (parameter `metadata.max.age.ms`). And of course, you
can refresh the metadata you get via AdminClient each time you trigger
planning. I cannot quantify the overhead of a single request though.

Also, what offset information are you interested in? Because you don't
commit any offsets to Kafka, but store it in Spark's checkpoint, it's
unclear what information you are looking for?

-Matthias


On 8/12/19 2:53 PM, Jungtaek Lim wrote:
> Thanks for the feedbacks Colin and Matthias.
> 
> I agree with you regarding getting topics and partitions via AdminClient,
> just curious how much the overhead would be. Would it be lighter, or
> heavier? We may not want to list topics in regular intervals - in plan
> phase we want to know up-to-date information so that the calculation from
> Spark itself makes sense.
> 
> On the other hands I'm not seeing any information regarding offset in
> current AdminClient, which is also one of reason we leverage consumer and
> call poll(0). Colin, as you mentioned there're KIPs addressing this, could
> you refer KIPs so that we can see whether it would work for our case?
> Without support of this we cannot replace our usage of consumer/poll with
> AdminClient.
> 
> ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> receives regex same as consumer subscription via pattern. We would like to
> provide same behavior what Kafka is basically providing as a source.
> 
> 
> On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for the details Jungtaek!
>>
>> I tend to agree with Colin, that using the AdminClient seems to be the
>> better choice.
>>
>> You can get all topics via `listTopics()` (and you can refresh this
>> information on regular intervals) and match any pattern against the list
>> of available topics in the driver.
>>
>> As you use `assignment()` and store offsets in the Spark checkpoint, it
>> seems that using consumer group management is not a good fit for the use
>> case.
>>
>>
>> Thoughts?
>>
>>
>>
>> -Matthias
>>
>> On 8/12/19 8:22 AM, Colin McCabe wrote:
>>> Hi,
>>>
>>> If there’s no need to consume records in the Spark driver, then the
>> Consumer is probably the wrong thing to use. Instead, Spark should use
>> AdminClient to find out what partitions exist and where, manage their
>> offsets, and so on. There are some KIPs under discussion now that would add
>> the necessary APIs for managing offsets.
>>>
>>> Best,
>>> Colin
>>>
>>> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>>>> My feeling is that I didn't explain the use case for Spark properly and
>>>> hence fail to explain the needs. Sorry about this.
>>>>
>>>> Spark leverages the single instance of KafkaConsumer in the driver
>> which is
>>>> registered solely on the consumer group. This is used in the plan phase
>> for
>>>> each micro-batch to calculate the overall topicpartitions with its
>> offset
>>>> ranges for this batch, and split and assign (topicpartition, fromOffset,
>>>> untilOffset) to each input partition. After the planning is done and
>> tasks
>>>> are being distributed to executors, consumer per each input partition
>> will
>>>> be initialized from some executor (being assigned to the single
>>>> topicpartition), and pull the actual records. (Pooling consumers is
>> applied
>>>> for sure.) As plan phase is to determine the overall topicpartitions and
>>>> offset ranges to process, Spark is never interested on pulling the
>> records
>>>> in driver side.
>>>>
>>>> Spark mainly leverages poll(0) to get the latest assigned partitions and
>>>> adopt the changes or validate the expectation. That's not only use case
>> for
>>>> poll(0). Spark is also seeking the offset per topicpartition to the
>>>> earliest or the latest, or specific one (either provided by end user or
>> the
>>>> last committed offset) so that Spark can have actual offset or validate
>> the
>>>> provided offset. According to the javadoc (if I understand correctly),
>> to
>>>> get the offset immediately it seems to be required to call `poll` or
>>>> `position`.
>>>>
>>>> The way Spark interacts with Kafka in this plan phase in driver is
>>>> synchronous, as the phase should finish ASAP to run the next phase.
>>>> Registering ConsumerRebalanceListener and tracking the change will
>> require
>>>> some asynchronous handling which sounds to add unnecessary complexity.
>>>> Spark may be OK with deal with synchronous with timeout (that's what
>>>> methods in KafkaConsumer have been providing - they're not
>> asynchronous, at
>>>> least for callers) but dealing with asynchronous is another level of
>>>> interest. I can see the benefit where continuous thread runs and the
>>>> consumer is busy with something continuously, relying on listener to
>> hear
>>>> the news on reassignment. Unfortunately that's not the case.
>>>>
>>>> Unit tests in Spark have similar needs: looks like Kafka test code also
>>>> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
>> places
>>>> as it's appropriate to the place which blocking (+timeout) call is
>>>> preferred - so I can see the similar needs from here as well.
>>>>
>>>> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>> gabor.g.somogyi@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Please see the actual implementation, pretty sure it explains the
>> situation
>>>>> well:
>>>>>
>>>>>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>>>>>
>>>>> To answer one question/assumption which popped up from all of you
>> Spark not
>>>>> only uses KafkaConsumer#subscribe but pattern subscribe +
>>>>> KafkaConsumer#assign as well.
>>>>> Please see here:
>>>>>
>>>>>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>> satish.duggana@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jungtaek,
>>>>>> Thanks for the KIP. I have a couple of questions here.
>>>>>> Is not Spark using Kafka's consumer group management across multiple
>>>>>> consumers?
>>>>>>
>>>>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>>>>>> ConsumerRebalanceListener listener) only to get all the topics for a
>>>>>> pattern based subscription and Spark manually assigns those
>>>>>> topic-partitions across consumers on workers?
>>>>>>
>>>>>> Thanks,
>>>>>> Satish.
>>>>>>
>>>>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>> matthias@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> If am not sure if I fully understand yet.
>>>>>>>
>>>>>>> The fact, that Spark does not stores offsets in Kafka but as part of
>>>>> its
>>>>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
>>>>>>> something here.
>>>>>>>
>>>>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
>>>>>>> that takes care of the assignment of partitions to clients within the
>>>>>>> group. Therefore, I am not sure what you mean by:
>>>>>>>
>>>>>>>> which Spark needs to
>>>>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>>>
>>>>>>> Multiple thoughts that may help:
>>>>>>>
>>>>>>> - if Spark needs more control about the partition assignment, you can
>>>>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
>>>>>>> configuration)
>>>>>>>
>>>>>>> - you may also want to register `ConsumerRebalanceListener` via
>>>>>>> `subscribe()` to get informed when the group rebalances
>>>>>>>
>>>>>>> As you pointed out, using pattern subscription metadata can change if
>>>>>>> topic are added/deleted. However, each metadata change will
>> triggering
>>>>> a
>>>>>>> rebalance and thus you would get corresponding calls to you rebalance
>>>>>>> listener to learn about it and react accordingly.
>>>>>>>
>>>>>>> Maybe you can explain why neither of both approaches works and what
>> gap
>>>>>>> the new API would close?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>>>>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
>>>>>>> Spark,
>>>>>>>> but this will apply for everything which want to control offset of
>>>>>> Kafka
>>>>>>>> consumers.
>>>>>>>>
>>>>>>>> Spark is managing the committed offsets and the offsets which should
>>>>> be
>>>>>>>> polled now. Topics and partitions as well. This is required as Spark
>>>>>>> itself
>>>>>>>> has its own general checkpoint mechanism and Kafka is just a one of
>>>>>>>> source/sink (though it's considered as very important).
>>>>>>>>
>>>>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
>>>>>>>> partitions it wants to subscribe(, and do seek and poll), but as
>>>>> Spark
>>>>>>> can
>>>>>>>> also provide "patterns" of topics, as well as subscription can be
>>>>>> changed
>>>>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
>>>>> needs
>>>>>>> to
>>>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>>>>
>>>>>>>> Looks like assignment() doesn't update the assignment information in
>>>>>>>> consumer. It just returns known one. There's only one known approach
>>>>>>> doing
>>>>>>>> this, calling `poll`, but Spark is not interested on returned
>>>>> records,
>>>>>> so
>>>>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
>>>>> This
>>>>>>> KIP
>>>>>>>> proposes to support this as official approach.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
>>>>>>> favor
>>>>>>>>> of describing it in this discussion thread so the discussion itself
>>>>>> can
>>>>>>> go
>>>>>>>>> forward. So copying my answer here:
>>>>>>>>>
>>>>>>>>> We have some use case which we don't just rely on everything what
>>>>>> Kafka
>>>>>>>>> consumer provides. We want to know current assignment on this
>>>>>> consumer,
>>>>>>> and
>>>>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>>>>>>>>>
>>>>>>>>> That said, we don't want to pull any records here, and if I'm not
>>>>>>> missing
>>>>>>>>> here, there's no way to accomplish this. Please guide me if I'm
>>>>>> missing
>>>>>>>>> something.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>>>>>> matthias@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>
>>>>>>>>>> Can you elaborate a little bit more on the use case for this
>>>>> feature?
>>>>>>>>>> Why would a consumer need to update it's metadata explicitly?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>>>>>>>>>>> Hi devs,
>>>>>>>>>>>
>>>>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
>>>>> public
>>>>>>>>>> method
>>>>>>>>>>> to only update assignment metadata in consumer.
>>>>>>>>>>>
>>>>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
>>>>>>>>>> guarantee
>>>>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
>>>>>>>>>> doesn't
>>>>>>>>>>> have same semantic, so would like to propose new public API which
>>>>>> only
>>>>>>>>>> does
>>>>>>>>>>> the desired behavior.
>>>>>>>>>>>
>>>>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>>>>>>>>>>>
>>>>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
>>>>> new
>>>>>>> to
>>>>>>>>>>> Kafka community and may not catch preferences (like
>>>>> TimeoutException
>>>>>>> vs
>>>>>>>>>>> boolean, etc.) on Kafka project.
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Name : Jungtaek Lim
>>>>>>>>> Blog : http://medium.com/@heartsavior
>>>>>>>>> Twitter : http://twitter.com/heartsavior
>>>>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Name : Jungtaek Lim
>>>> Blog : http://medium.com/@heartsavior
>>>> Twitter : http://twitter.com/heartsavior
>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
Hi Harsha,

I'm not sure what exactly the class is doing, but if I can't get all the
necessary information from that class, I would end up with calling others
and go back to same issue. And skimming the class, it seems to be
complicated one (end-users unfriendly, as it's designed to be used
internally) which end users may not indicate how to leverage the class.
IMHO, exposing assignment metadata update on consumer sounds simpler.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Aug 13, 2019 at 7:51 AM Harsha Chintalapani <ka...@harsha.io> wrote:

> Hi Jungtaek,
>                    Have you looked into this interface
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
> .
> Right now its not a public interface but does the methods available in this
> interface work for your needs? . The DefaultMeatadataUpdater responsible
> for making the metadata requests to brokers
>
> https://github.com/apache/kafka/blob/26814e060e98f9674127be13a28ce41a21ca6b3c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L958
> and
> if it can be invoked from client methods does that solve your requirements?
>
> Thanks,
> Harsha
>
>
> On Mon, Aug 12, 2019 at 2:53 PM, Jungtaek Lim <ka...@gmail.com> wrote:
>
> > Thanks for the feedbacks Colin and Matthias.
> >
> > I agree with you regarding getting topics and partitions via AdminClient,
> > just curious how much the overhead would be. Would it be lighter, or
> > heavier? We may not want to list topics in regular intervals - in plan
> > phase we want to know up-to-date information so that the calculation from
> > Spark itself makes sense.
> >
> > On the other hands I'm not seeing any information regarding offset in
> > current AdminClient, which is also one of reason we leverage consumer and
> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> could
> > you refer KIPs so that we can see whether it would work for our case?
> > Without support of this we cannot replace our usage of consumer/poll with
> > AdminClient.
> >
> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> > receives regex same as consumer subscription via pattern. We would like
> to
> > provide same behavior what Kafka is basically providing as a source.
> >
> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > Thanks for the details Jungtaek!
> >
> > I tend to agree with Colin, that using the AdminClient seems to be the
> > better choice.
> >
> > You can get all topics via `listTopics()` (and you can refresh this
> > information on regular intervals) and match any pattern against the list
> of
> > available topics in the driver.
> >
> > As you use `assignment()` and store offsets in the Spark checkpoint, it
> > seems that using consumer group management is not a good fit for the use
> > case.
> >
> > Thoughts?
> >
> > -Matthias
> >
> > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >
> > Hi,
> >
> > If there’s no need to consume records in the Spark driver, then the
> >
> > Consumer is probably the wrong thing to use. Instead, Spark should use
> > AdminClient to find out what partitions exist and where, manage their
> > offsets, and so on. There are some KIPs under discussion now that would
> add
> > the necessary APIs for managing offsets.
> >
> > Best,
> > Colin
> >
> > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >
> > My feeling is that I didn't explain the use case for Spark properly and
> > hence fail to explain the needs. Sorry about this.
> >
> > Spark leverages the single instance of KafkaConsumer in the driver
> >
> > which is
> >
> > registered solely on the consumer group. This is used in the plan phase
> >
> > for
> >
> > each micro-batch to calculate the overall topicpartitions with its
> >
> > offset
> >
> > ranges for this batch, and split and assign (topicpartition, fromOffset,
> > untilOffset) to each input partition. After the planning is done and
> >
> > tasks
> >
> > are being distributed to executors, consumer per each input partition
> >
> > will
> >
> > be initialized from some executor (being assigned to the single
> > topicpartition), and pull the actual records. (Pooling consumers is
> >
> > applied
> >
> > for sure.) As plan phase is to determine the overall topicpartitions and
> > offset ranges to process, Spark is never interested on pulling the
> >
> > records
> >
> > in driver side.
> >
> > Spark mainly leverages poll(0) to get the latest assigned partitions and
> > adopt the changes or validate the expectation. That's not only use case
> >
> > for
> >
> > poll(0). Spark is also seeking the offset per topicpartition to the
> > earliest or the latest, or specific one (either provided by end user or
> >
> > the
> >
> > last committed offset) so that Spark can have actual offset or validate
> >
> > the
> >
> > provided offset. According to the javadoc (if I understand correctly),
> >
> > to
> >
> > get the offset immediately it seems to be required to call `poll` or
> > `position`.
> >
> > The way Spark interacts with Kafka in this plan phase in driver is
> > synchronous, as the phase should finish ASAP to run the next phase.
> > Registering ConsumerRebalanceListener and tracking the change will
> >
> > require
> >
> > some asynchronous handling which sounds to add unnecessary complexity.
> > Spark may be OK with deal with synchronous with timeout (that's what
> > methods in KafkaConsumer have been providing - they're not
> >
> > asynchronous, at
> >
> > least for callers) but dealing with asynchronous is another level of
> > interest. I can see the benefit where continuous thread runs and the
> > consumer is busy with something continuously, relying on listener to
> >
> > hear
> >
> > the news on reassignment. Unfortunately that's not the case.
> >
> > Unit tests in Spark have similar needs: looks like Kafka test code also
> > leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> >
> > places
> >
> > as it's appropriate to the place which blocking (+timeout) call is
> > preferred - so I can see the similar needs from here as well.
> >
> > On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >
> > gabor.g.somogyi@gmail.com>
> >
> > wrote:
> >
> > Hi Guys,
> >
> > Please see the actual implementation, pretty sure it explains the
> >
> > situation
> >
> > well:
> >
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> > main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >
> > To answer one question/assumption which popped up from all of you
> >
> > Spark not
> >
> > only uses KafkaConsumer#subscribe but pattern subscribe +
> > KafkaConsumer#assign as well.
> > Please see here:
> >
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> > main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >
> > BR,
> > G
> >
> > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >
> > satish.duggana@gmail.com>
> >
> > wrote:
> >
> > Hi Jungtaek,
> > Thanks for the KIP. I have a couple of questions here. Is not Spark using
> > Kafka's consumer group management across multiple consumers?
> >
> > Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > ConsumerRebalanceListener listener) only to get all the topics for a
> > pattern based subscription and Spark manually assigns those
> > topic-partitions across consumers on workers?
> >
> > Thanks,
> > Satish.
> >
> > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >
> > matthias@confluent.io>
> >
> > wrote:
> >
> > If am not sure if I fully understand yet.
> >
> > The fact, that Spark does not stores offsets in Kafka but as part of
> >
> > its
> >
> > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > something here.
> >
> > As you are using subscribe(), you use Kafka consumer group mechanism,
> that
> > takes care of the assignment of partitions to clients within the group.
> > Therefore, I am not sure what you mean by:
> >
> > which Spark needs to
> >
> > know to coordinate multiple consumers to pull correctly.
> >
> > Multiple thoughts that may help:
> >
> > - if Spark needs more control about the partition assignment, you can
> > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > configuration)
> >
> > - you may also want to register `ConsumerRebalanceListener` via
> > `subscribe()` to get informed when the group rebalances
> >
> > As you pointed out, using pattern subscription metadata can change if
> > topic are added/deleted. However, each metadata change will
> >
> > triggering
> >
> > a
> >
> > rebalance and thus you would get corresponding calls to you rebalance
> > listener to learn about it and react accordingly.
> >
> > Maybe you can explain why neither of both approaches works and what
> >
> > gap
> >
> > the new API would close?
> >
> > -Matthias
> >
> > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >
> > Let me elaborate my explanation a bit more. Here we say about Apache
> >
> > Spark,
> >
> > but this will apply for everything which want to control offset of
> >
> > Kafka
> >
> > consumers.
> >
> > Spark is managing the committed offsets and the offsets which should
> >
> > be
> >
> > polled now. Topics and partitions as well. This is required as Spark
> >
> > itself
> >
> > has its own general checkpoint mechanism and Kafka is just a one of
> > source/sink (though it's considered as very important).
> >
> > To pull records from Kafka, Spark provides to Kafka which topics and
> > partitions it wants to subscribe(, and do seek and poll), but as
> >
> > Spark
> >
> > can
> >
> > also provide "patterns" of topics, as well as subscription can be
> >
> > changed
> >
> > in Kafka side (topic added/dropped, partitions added) which Spark
> >
> > needs
> >
> > to
> >
> > know to coordinate multiple consumers to pull correctly.
> >
> > Looks like assignment() doesn't update the assignment information in
> > consumer. It just returns known one. There's only one known approach
> >
> > doing
> >
> > this, calling `poll`, but Spark is not interested on returned
> >
> > records,
> >
> > so
> >
> > there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> >
> > This
> >
> > KIP
> >
> > proposes to support this as official approach.
> >
> > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> >
> > wrote:
> >
> > Sorry I didn't recognize you're also asking it here as well. I'm in
> >
> > favor
> >
> > of describing it in this discussion thread so the discussion itself
> >
> > can
> >
> > go
> >
> > forward. So copying my answer here:
> >
> > We have some use case which we don't just rely on everything what
> >
> > Kafka
> >
> > consumer provides. We want to know current assignment on this
> >
> > consumer,
> >
> > and
> >
> > to get the latest assignment, we called the hack `poll(0)`.
> >
> > That said, we don't want to pull any records here, and if I'm not
> >
> > missing
> >
> > here, there's no way to accomplish this. Please guide me if I'm
> >
> > missing
> >
> > something.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >
> > matthias@confluent.io>
> >
> > wrote:
> >
> > Thanks for the KIP.
> >
> > Can you elaborate a little bit more on the use case for this
> >
> > feature?
> >
> > Why would a consumer need to update it's metadata explicitly?
> >
> > -Matthias
> >
> > On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >
> > Hi devs,
> >
> > I'd like to initiate discussion around KIP-505, exposing new
> >
> > public
> >
> > method
> >
> > to only update assignment metadata in consumer.
> >
> > `poll(0)` has been misused as according to Kafka doc it doesn't
> >
> > guarantee
> >
> > that it doesn't pull any records, and new method `poll(Duration)`
> >
> > doesn't
> >
> > have same semantic, so would like to propose new public API which
> >
> > only
> >
> > does
> >
> > the desired behavior.
> >
> > KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >
> > Please feel free to suggest any improvements on proposal, as I'm
> >
> > new
> >
> > to
> >
> > Kafka community and may not catch preferences (like
> >
> > TimeoutException
> >
> > vs
> >
> > boolean, etc.) on Kafka project.
> >
> > Thanks in advance!
> > Jungtaek Lim (HeartSaVioR)
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Harsha Chintalapani <ka...@harsha.io>.
Hi Jungtaek,
                   Have you looked into this interface
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
.
Right now its not a public interface but does the methods available in this
interface work for your needs? . The DefaultMeatadataUpdater responsible
for making the metadata requests to brokers
https://github.com/apache/kafka/blob/26814e060e98f9674127be13a28ce41a21ca6b3c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L958
and
if it can be invoked from client methods does that solve your requirements?

Thanks,
Harsha


On Mon, Aug 12, 2019 at 2:53 PM, Jungtaek Lim <ka...@gmail.com> wrote:

> Thanks for the feedbacks Colin and Matthias.
>
> I agree with you regarding getting topics and partitions via AdminClient,
> just curious how much the overhead would be. Would it be lighter, or
> heavier? We may not want to list topics in regular intervals - in plan
> phase we want to know up-to-date information so that the calculation from
> Spark itself makes sense.
>
> On the other hands I'm not seeing any information regarding offset in
> current AdminClient, which is also one of reason we leverage consumer and
> call poll(0). Colin, as you mentioned there're KIPs addressing this, could
> you refer KIPs so that we can see whether it would work for our case?
> Without support of this we cannot replace our usage of consumer/poll with
> AdminClient.
>
> ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> receives regex same as consumer subscription via pattern. We would like to
> provide same behavior what Kafka is basically providing as a source.
>
> On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> Thanks for the details Jungtaek!
>
> I tend to agree with Colin, that using the AdminClient seems to be the
> better choice.
>
> You can get all topics via `listTopics()` (and you can refresh this
> information on regular intervals) and match any pattern against the list of
> available topics in the driver.
>
> As you use `assignment()` and store offsets in the Spark checkpoint, it
> seems that using consumer group management is not a good fit for the use
> case.
>
> Thoughts?
>
> -Matthias
>
> On 8/12/19 8:22 AM, Colin McCabe wrote:
>
> Hi,
>
> If there’s no need to consume records in the Spark driver, then the
>
> Consumer is probably the wrong thing to use. Instead, Spark should use
> AdminClient to find out what partitions exist and where, manage their
> offsets, and so on. There are some KIPs under discussion now that would add
> the necessary APIs for managing offsets.
>
> Best,
> Colin
>
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>
> My feeling is that I didn't explain the use case for Spark properly and
> hence fail to explain the needs. Sorry about this.
>
> Spark leverages the single instance of KafkaConsumer in the driver
>
> which is
>
> registered solely on the consumer group. This is used in the plan phase
>
> for
>
> each micro-batch to calculate the overall topicpartitions with its
>
> offset
>
> ranges for this batch, and split and assign (topicpartition, fromOffset,
> untilOffset) to each input partition. After the planning is done and
>
> tasks
>
> are being distributed to executors, consumer per each input partition
>
> will
>
> be initialized from some executor (being assigned to the single
> topicpartition), and pull the actual records. (Pooling consumers is
>
> applied
>
> for sure.) As plan phase is to determine the overall topicpartitions and
> offset ranges to process, Spark is never interested on pulling the
>
> records
>
> in driver side.
>
> Spark mainly leverages poll(0) to get the latest assigned partitions and
> adopt the changes or validate the expectation. That's not only use case
>
> for
>
> poll(0). Spark is also seeking the offset per topicpartition to the
> earliest or the latest, or specific one (either provided by end user or
>
> the
>
> last committed offset) so that Spark can have actual offset or validate
>
> the
>
> provided offset. According to the javadoc (if I understand correctly),
>
> to
>
> get the offset immediately it seems to be required to call `poll` or
> `position`.
>
> The way Spark interacts with Kafka in this plan phase in driver is
> synchronous, as the phase should finish ASAP to run the next phase.
> Registering ConsumerRebalanceListener and tracking the change will
>
> require
>
> some asynchronous handling which sounds to add unnecessary complexity.
> Spark may be OK with deal with synchronous with timeout (that's what
> methods in KafkaConsumer have been providing - they're not
>
> asynchronous, at
>
> least for callers) but dealing with asynchronous is another level of
> interest. I can see the benefit where continuous thread runs and the
> consumer is busy with something continuously, relying on listener to
>
> hear
>
> the news on reassignment. Unfortunately that's not the case.
>
> Unit tests in Spark have similar needs: looks like Kafka test code also
> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
>
> places
>
> as it's appropriate to the place which blocking (+timeout) call is
> preferred - so I can see the similar needs from here as well.
>
> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>
> gabor.g.somogyi@gmail.com>
>
> wrote:
>
> Hi Guys,
>
> Please see the actual implementation, pretty sure it explains the
>
> situation
>
> well:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>
> To answer one question/assumption which popped up from all of you
>
> Spark not
>
> only uses KafkaConsumer#subscribe but pattern subscribe +
> KafkaConsumer#assign as well.
> Please see here:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>
> BR,
> G
>
> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>
> satish.duggana@gmail.com>
>
> wrote:
>
> Hi Jungtaek,
> Thanks for the KIP. I have a couple of questions here. Is not Spark using
> Kafka's consumer group management across multiple consumers?
>
> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> ConsumerRebalanceListener listener) only to get all the topics for a
> pattern based subscription and Spark manually assigns those
> topic-partitions across consumers on workers?
>
> Thanks,
> Satish.
>
> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>
> matthias@confluent.io>
>
> wrote:
>
> If am not sure if I fully understand yet.
>
> The fact, that Spark does not stores offsets in Kafka but as part of
>
> its
>
> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> something here.
>
> As you are using subscribe(), you use Kafka consumer group mechanism, that
> takes care of the assignment of partitions to clients within the group.
> Therefore, I am not sure what you mean by:
>
> which Spark needs to
>
> know to coordinate multiple consumers to pull correctly.
>
> Multiple thoughts that may help:
>
> - if Spark needs more control about the partition assignment, you can
> provide a custom `ConsumerPartitionAssignor` (via the consumer
> configuration)
>
> - you may also want to register `ConsumerRebalanceListener` via
> `subscribe()` to get informed when the group rebalances
>
> As you pointed out, using pattern subscription metadata can change if
> topic are added/deleted. However, each metadata change will
>
> triggering
>
> a
>
> rebalance and thus you would get corresponding calls to you rebalance
> listener to learn about it and react accordingly.
>
> Maybe you can explain why neither of both approaches works and what
>
> gap
>
> the new API would close?
>
> -Matthias
>
> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>
> Let me elaborate my explanation a bit more. Here we say about Apache
>
> Spark,
>
> but this will apply for everything which want to control offset of
>
> Kafka
>
> consumers.
>
> Spark is managing the committed offsets and the offsets which should
>
> be
>
> polled now. Topics and partitions as well. This is required as Spark
>
> itself
>
> has its own general checkpoint mechanism and Kafka is just a one of
> source/sink (though it's considered as very important).
>
> To pull records from Kafka, Spark provides to Kafka which topics and
> partitions it wants to subscribe(, and do seek and poll), but as
>
> Spark
>
> can
>
> also provide "patterns" of topics, as well as subscription can be
>
> changed
>
> in Kafka side (topic added/dropped, partitions added) which Spark
>
> needs
>
> to
>
> know to coordinate multiple consumers to pull correctly.
>
> Looks like assignment() doesn't update the assignment information in
> consumer. It just returns known one. There's only one known approach
>
> doing
>
> this, calling `poll`, but Spark is not interested on returned
>
> records,
>
> so
>
> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
>
> This
>
> KIP
>
> proposes to support this as official approach.
>
> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
>
> wrote:
>
> Sorry I didn't recognize you're also asking it here as well. I'm in
>
> favor
>
> of describing it in this discussion thread so the discussion itself
>
> can
>
> go
>
> forward. So copying my answer here:
>
> We have some use case which we don't just rely on everything what
>
> Kafka
>
> consumer provides. We want to know current assignment on this
>
> consumer,
>
> and
>
> to get the latest assignment, we called the hack `poll(0)`.
>
> That said, we don't want to pull any records here, and if I'm not
>
> missing
>
> here, there's no way to accomplish this. Please guide me if I'm
>
> missing
>
> something.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>
> matthias@confluent.io>
>
> wrote:
>
> Thanks for the KIP.
>
> Can you elaborate a little bit more on the use case for this
>
> feature?
>
> Why would a consumer need to update it's metadata explicitly?
>
> -Matthias
>
> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>
> Hi devs,
>
> I'd like to initiate discussion around KIP-505, exposing new
>
> public
>
> method
>
> to only update assignment metadata in consumer.
>
> `poll(0)` has been misused as according to Kafka doc it doesn't
>
> guarantee
>
> that it doesn't pull any records, and new method `poll(Duration)`
>
> doesn't
>
> have same semantic, so would like to propose new public API which
>
> only
>
> does
>
> the desired behavior.
>
> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>
> Please feel free to suggest any improvements on proposal, as I'm
>
> new
>
> to
>
> Kafka community and may not catch preferences (like
>
> TimeoutException
>
> vs
>
> boolean, etc.) on Kafka project.
>
> Thanks in advance!
> Jungtaek Lim (HeartSaVioR)
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Dongjin Lee <do...@apache.org>.
Sorry for being late.

It seems like I found a case which requires a method to update Consumer
metadata. In short, kafka-console-consumer.sh is working differently from
2.1.0 for lack of this functionality.

https://issues.apache.org/jira/browse/KAFKA-8789
https://github.com/apache/kafka/pull/7206

Thanks,
Dongjin

On Tue, Aug 13, 2019 at 9:58 PM Gabor Somogyi <ga...@gmail.com>
wrote:

> I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope
> others would consider it as a good solution...
>
> G
>
>
> On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <ga...@gmail.com>
> wrote:
>
> > I've had concerns calling AdminClient.listTopics because on big clusters
> > I've seen OOM because of too many TopicPartitions.
> > On the other this problem already exists in the actual implementation
> > because as Colin said Consumer is doing the same on client side. All in
> all
> > this part is fine.
> >
> > I've checked all the actual use-cases on Spark side which has to be
> > covered and it looks doable.
> >
> >
> > On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <ka...@gmail.com> wrote:
> >
> >> So in overall, AdminClient covers the necessary to retrieve up-to-date
> >> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
> >> offset (EARLIEST, LATEST, timestamp) on partition.
> >>
> >> Gabor, could you please add the input if I'm missing something? I'd like
> >> to
> >> double-check on this.
> >>
> >> Assuming I'm not missing something, what would be preferred next action?
> >> Personally I'd keep this as it is until KIP-396 passes the vote (the
> vote
> >> for KIP-396 opened at January and it still doesn't pass - 7 months -
> which
> >> worries me a bit if it's going to pass the vote or not), but I also
> >> respect
> >> the lifecycle of KIP in Kafka community.
> >>
> >> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <ka...@gmail.com>
> wrote:
> >>
> >> >
> >> >
> >> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org>
> >> wrote:
> >> >
> >> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> >> >> > Thanks for the feedbacks Colin and Matthias.
> >> >> >
> >> >> > I agree with you regarding getting topics and partitions via
> >> >> AdminClient,
> >> >> > just curious how much the overhead would be. Would it be lighter,
> or
> >> >> > heavier? We may not want to list topics in regular intervals - in
> >> plan
> >> >> > phase we want to know up-to-date information so that the
> calculation
> >> >> from
> >> >> > Spark itself makes sense.
> >> >>
> >> >> It would be lighter. The consumer will periodically refresh metadata
> >> for
> >> >> any topic you are subscribed to. AdminClient doesn’t have the concept
> >> of
> >> >> subscriptions, and won’t refresh topic metadata until you request it.
> >> >>
> >> >
> >> > Sounds great! Happy to hear about that.
> >> >
> >> >
> >> >>
> >> >> >
> >> >> > On the other hands I'm not seeing any information regarding offset
> in
> >> >> > current AdminClient, which is also one of reason we leverage
> consumer
> >> >> and
> >> >> > call poll(0). Colin, as you mentioned there're KIPs addressing
> this,
> >> >> could
> >> >> > you refer KIPs so that we can see whether it would work for our
> case?
> >> >> > Without support of this we cannot replace our usage of
> consumer/poll
> >> >> with
> >> >> > AdminClient.
> >> >>
> >> >> KIP-396 is the one for listing offsets in AdminClient.
> >> >>
> >> >
> >> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
> >> > information, even for timestamp. Thanks!
> >> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one
> call,
> >> > but not a big deal as it just requires two calls.
> >> >
> >> > >
> >> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
> >> which
> >> >> > receives regex same as consumer subscription via pattern. We would
> >> like
> >> >> to
> >> >> > provide same behavior what Kafka is basically providing as a
> source.
> >> >>
> >> >> We don’t have a regex listTopics at the moment, though we could add
> >> this.
> >> >> Currently, the regex is done on the client side anyway (although we’d
> >> >> really like to change this in the future). So just listing everything
> >> and
> >> >> filtering locally would be the same performance and behavior as the
> >> >> Consumer.
> >> >>
> >> >
> >> > I see. Good to know regex is done on the client side - I've just
> >> searched
> >> > some code and it applies filter for all topics retrieved from metadata
> >> > fetch. Then it would be mostly no difference on this. Thanks for
> >> confirming.
> >> >
> >> >
> >> >>
> >> >> best,
> >> >> Colin
> >> >>
> >> >> >
> >> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
> >> matthias@confluent.io>
> >> >> > wrote:
> >> >> >
> >> >> > > Thanks for the details Jungtaek!
> >> >> > >
> >> >> > > I tend to agree with Colin, that using the AdminClient seems to
> be
> >> the
> >> >> > > better choice.
> >> >> > >
> >> >> > > You can get all topics via `listTopics()` (and you can refresh
> this
> >> >> > > information on regular intervals) and match any pattern against
> the
> >> >> list
> >> >> > > of available topics in the driver.
> >> >> > >
> >> >> > > As you use `assignment()` and store offsets in the Spark
> >> checkpoint,
> >> >> it
> >> >> > > seems that using consumer group management is not a good fit for
> >> the
> >> >> use
> >> >> > > case.
> >> >> > >
> >> >> > >
> >> >> > > Thoughts?
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > -Matthias
> >> >> > >
> >> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >> >> > > > Hi,
> >> >> > > >
> >> >> > > > If there’s no need to consume records in the Spark driver, then
> >> the
> >> >> > > Consumer is probably the wrong thing to use. Instead, Spark
> should
> >> use
> >> >> > > AdminClient to find out what partitions exist and where, manage
> >> their
> >> >> > > offsets, and so on. There are some KIPs under discussion now that
> >> >> would add
> >> >> > > the necessary APIs for managing offsets.
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Colin
> >> >> > > >
> >> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> >> > > >> My feeling is that I didn't explain the use case for Spark
> >> >> properly and
> >> >> > > >> hence fail to explain the needs. Sorry about this.
> >> >> > > >>
> >> >> > > >> Spark leverages the single instance of KafkaConsumer in the
> >> driver
> >> >> > > which is
> >> >> > > >> registered solely on the consumer group. This is used in the
> >> plan
> >> >> phase
> >> >> > > for
> >> >> > > >> each micro-batch to calculate the overall topicpartitions with
> >> its
> >> >> > > offset
> >> >> > > >> ranges for this batch, and split and assign (topicpartition,
> >> >> fromOffset,
> >> >> > > >> untilOffset) to each input partition. After the planning is
> done
> >> >> and
> >> >> > > tasks
> >> >> > > >> are being distributed to executors, consumer per each input
> >> >> partition
> >> >> > > will
> >> >> > > >> be initialized from some executor (being assigned to the
> single
> >> >> > > >> topicpartition), and pull the actual records. (Pooling
> >> consumers is
> >> >> > > applied
> >> >> > > >> for sure.) As plan phase is to determine the overall
> >> >> topicpartitions and
> >> >> > > >> offset ranges to process, Spark is never interested on pulling
> >> the
> >> >> > > records
> >> >> > > >> in driver side.
> >> >> > > >>
> >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
> >> >> partitions and
> >> >> > > >> adopt the changes or validate the expectation. That's not only
> >> use
> >> >> case
> >> >> > > for
> >> >> > > >> poll(0). Spark is also seeking the offset per topicpartition
> to
> >> the
> >> >> > > >> earliest or the latest, or specific one (either provided by
> end
> >> >> user or
> >> >> > > the
> >> >> > > >> last committed offset) so that Spark can have actual offset or
> >> >> validate
> >> >> > > the
> >> >> > > >> provided offset. According to the javadoc (if I understand
> >> >> correctly),
> >> >> > > to
> >> >> > > >> get the offset immediately it seems to be required to call
> >> `poll`
> >> >> or
> >> >> > > >> `position`.
> >> >> > > >>
> >> >> > > >> The way Spark interacts with Kafka in this plan phase in
> driver
> >> is
> >> >> > > >> synchronous, as the phase should finish ASAP to run the next
> >> phase.
> >> >> > > >> Registering ConsumerRebalanceListener and tracking the change
> >> will
> >> >> > > require
> >> >> > > >> some asynchronous handling which sounds to add unnecessary
> >> >> complexity.
> >> >> > > >> Spark may be OK with deal with synchronous with timeout
> (that's
> >> >> what
> >> >> > > >> methods in KafkaConsumer have been providing - they're not
> >> >> > > asynchronous, at
> >> >> > > >> least for callers) but dealing with asynchronous is another
> >> level
> >> >> of
> >> >> > > >> interest. I can see the benefit where continuous thread runs
> and
> >> >> the
> >> >> > > >> consumer is busy with something continuously, relying on
> >> listener
> >> >> to
> >> >> > > hear
> >> >> > > >> the news on reassignment. Unfortunately that's not the case.
> >> >> > > >>
> >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
> >> code
> >> >> also
> >> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
> >> many
> >> >> > > places
> >> >> > > >> as it's appropriate to the place which blocking (+timeout)
> call
> >> is
> >> >> > > >> preferred - so I can see the similar needs from here as well.
> >> >> > > >>
> >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> >> > > gabor.g.somogyi@gmail.com>
> >> >> > > >> wrote:
> >> >> > > >>
> >> >> > > >>> Hi Guys,
> >> >> > > >>>
> >> >> > > >>> Please see the actual implementation, pretty sure it explains
> >> the
> >> >> > > situation
> >> >> > > >>> well:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >> >> > > >>>
> >> >> > > >>> To answer one question/assumption which popped up from all of
> >> you
> >> >> > > Spark not
> >> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >> >> > > >>> KafkaConsumer#assign as well.
> >> >> > > >>> Please see here:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >> >> > > >>>
> >> >> > > >>> BR,
> >> >> > > >>> G
> >> >> > > >>>
> >> >> > > >>>
> >> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> >> > > satish.duggana@gmail.com>
> >> >> > > >>> wrote:
> >> >> > > >>>
> >> >> > > >>>> Hi Jungtaek,
> >> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
> >> >> > > >>>> Is not Spark using Kafka's consumer group management across
> >> >> multiple
> >> >> > > >>>> consumers?
> >> >> > > >>>>
> >> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >> >> > > >>>> ConsumerRebalanceListener listener) only to get all the
> topics
> >> >> for a
> >> >> > > >>>> pattern based subscription and Spark manually assigns those
> >> >> > > >>>> topic-partitions across consumers on workers?
> >> >> > > >>>>
> >> >> > > >>>> Thanks,
> >> >> > > >>>> Satish.
> >> >> > > >>>>
> >> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> >> > > matthias@confluent.io>
> >> >> > > >>>> wrote:
> >> >> > > >>>>
> >> >> > > >>>>> If am not sure if I fully understand yet.
> >> >> > > >>>>>
> >> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but
> as
> >> >> part of
> >> >> > > >>> its
> >> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> >> >> missing
> >> >> > > >>>>> something here.
> >> >> > > >>>>>
> >> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> >> >> mechanism,
> >> >> > > >>>>> that takes care of the assignment of partitions to clients
> >> >> within the
> >> >> > > >>>>> group. Therefore, I am not sure what you mean by:
> >> >> > > >>>>>
> >> >> > > >>>>>> which Spark needs to
> >> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>
> >> >> > > >>>>> Multiple thoughts that may help:
> >> >> > > >>>>>
> >> >> > > >>>>> - if Spark needs more control about the partition
> assignment,
> >> >> you can
> >> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the
> >> consumer
> >> >> > > >>>>> configuration)
> >> >> > > >>>>>
> >> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
> >> via
> >> >> > > >>>>> `subscribe()` to get informed when the group rebalances
> >> >> > > >>>>>
> >> >> > > >>>>> As you pointed out, using pattern subscription metadata can
> >> >> change if
> >> >> > > >>>>> topic are added/deleted. However, each metadata change will
> >> >> > > triggering
> >> >> > > >>> a
> >> >> > > >>>>> rebalance and thus you would get corresponding calls to you
> >> >> rebalance
> >> >> > > >>>>> listener to learn about it and react accordingly.
> >> >> > > >>>>>
> >> >> > > >>>>> Maybe you can explain why neither of both approaches works
> >> and
> >> >> what
> >> >> > > gap
> >> >> > > >>>>> the new API would close?
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>> -Matthias
> >> >> > > >>>>>
> >> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say
> >> about
> >> >> Apache
> >> >> > > >>>>> Spark,
> >> >> > > >>>>>> but this will apply for everything which want to control
> >> >> offset of
> >> >> > > >>>> Kafka
> >> >> > > >>>>>> consumers.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Spark is managing the committed offsets and the offsets
> >> which
> >> >> should
> >> >> > > >>> be
> >> >> > > >>>>>> polled now. Topics and partitions as well. This is
> required
> >> as
> >> >> Spark
> >> >> > > >>>>> itself
> >> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is
> just a
> >> >> one of
> >> >> > > >>>>>> source/sink (though it's considered as very important).
> >> >> > > >>>>>>
> >> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> >> >> topics and
> >> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll),
> >> but
> >> >> as
> >> >> > > >>> Spark
> >> >> > > >>>>> can
> >> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
> >> can
> >> >> be
> >> >> > > >>>> changed
> >> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added)
> which
> >> >> Spark
> >> >> > > >>> needs
> >> >> > > >>>>> to
> >> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Looks like assignment() doesn't update the assignment
> >> >> information in
> >> >> > > >>>>>> consumer. It just returns known one. There's only one
> known
> >> >> approach
> >> >> > > >>>>> doing
> >> >> > > >>>>>> this, calling `poll`, but Spark is not interested on
> >> returned
> >> >> > > >>> records,
> >> >> > > >>>> so
> >> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated
> >> the
> >> >> API.
> >> >> > > >>> This
> >> >> > > >>>>> KIP
> >> >> > > >>>>>> proposes to support this as official approach.
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
> >> >> kabhwan@gmail.com>
> >> >> > > >>>> wrote:
> >> >> > > >>>>>>
> >> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as
> >> well.
> >> >> I'm in
> >> >> > > >>>>> favor
> >> >> > > >>>>>>> of describing it in this discussion thread so the
> >> discussion
> >> >> itself
> >> >> > > >>>> can
> >> >> > > >>>>> go
> >> >> > > >>>>>>> forward. So copying my answer here:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> We have some use case which we don't just rely on
> >> everything
> >> >> what
> >> >> > > >>>> Kafka
> >> >> > > >>>>>>> consumer provides. We want to know current assignment on
> >> this
> >> >> > > >>>> consumer,
> >> >> > > >>>>> and
> >> >> > > >>>>>>> to get the latest assignment, we called the hack
> `poll(0)`.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> That said, we don't want to pull any records here, and if
> >> I'm
> >> >> not
> >> >> > > >>>>> missing
> >> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me
> if
> >> >> I'm
> >> >> > > >>>> missing
> >> >> > > >>>>>>> something.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Thanks,
> >> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >> >> > > >>>> matthias@confluent.io>
> >> >> > > >>>>>>> wrote:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>> Thanks for the KIP.
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
> >> this
> >> >> > > >>> feature?
> >> >> > > >>>>>>>> Why would a consumer need to update it's metadata
> >> explicitly?
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> -Matthias
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >> >> > > >>>>>>>>> Hi devs,
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505,
> exposing
> >> new
> >> >> > > >>> public
> >> >> > > >>>>>>>> method
> >> >> > > >>>>>>>>> to only update assignment metadata in consumer.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> >> >> doesn't
> >> >> > > >>>>>>>> guarantee
> >> >> > > >>>>>>>>> that it doesn't pull any records, and new method
> >> >> `poll(Duration)`
> >> >> > > >>>>>>>> doesn't
> >> >> > > >>>>>>>>> have same semantic, so would like to propose new public
> >> API
> >> >> which
> >> >> > > >>>> only
> >> >> > > >>>>>>>> does
> >> >> > > >>>>>>>>> the desired behavior.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Please feel free to suggest any improvements on
> proposal,
> >> >> as I'm
> >> >> > > >>> new
> >> >> > > >>>>> to
> >> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
> >> >> > > >>> TimeoutException
> >> >> > > >>>>> vs
> >> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Thanks in advance!
> >> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> --
> >> >> > > >>>>>>> Name : Jungtaek Lim
> >> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
> >> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> >> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>
> >> >> > > >>>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> --
> >> >> > > >> Name : Jungtaek Lim
> >> >> > > >> Blog : http://medium.com/@heartsavior
> >> >> > > >> Twitter : http://twitter.com/heartsavior
> >> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >> > --
> >> >> > Name : Jungtaek Lim
> >> >> > Blog : http://medium.com/@heartsavior
> >> >> > Twitter : http://twitter.com/heartsavior
> >> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > Name : Jungtaek Lim
> >> > Blog : http://medium.com/@heartsavior
> >> > Twitter : http://twitter.com/heartsavior
> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >
> >>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Colin McCabe <cm...@apache.org>.
That is a good point-- we should get KIP-396 voted on.  I will review it today.

best,
Colin


On Tue, Aug 13, 2019, at 05:58, Gabor Somogyi wrote:
> I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope
> others would consider it as a good solution...
> 
> G
> 
> 
> On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <ga...@gmail.com>
> wrote:
> 
> > I've had concerns calling AdminClient.listTopics because on big clusters
> > I've seen OOM because of too many TopicPartitions.
> > On the other this problem already exists in the actual implementation
> > because as Colin said Consumer is doing the same on client side. All in all
> > this part is fine.
> >
> > I've checked all the actual use-cases on Spark side which has to be
> > covered and it looks doable.
> >
> >
> > On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <ka...@gmail.com> wrote:
> >
> >> So in overall, AdminClient covers the necessary to retrieve up-to-date
> >> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
> >> offset (EARLIEST, LATEST, timestamp) on partition.
> >>
> >> Gabor, could you please add the input if I'm missing something? I'd like
> >> to
> >> double-check on this.
> >>
> >> Assuming I'm not missing something, what would be preferred next action?
> >> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
> >> for KIP-396 opened at January and it still doesn't pass - 7 months - which
> >> worries me a bit if it's going to pass the vote or not), but I also
> >> respect
> >> the lifecycle of KIP in Kafka community.
> >>
> >> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <ka...@gmail.com> wrote:
> >>
> >> >
> >> >
> >> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org>
> >> wrote:
> >> >
> >> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> >> >> > Thanks for the feedbacks Colin and Matthias.
> >> >> >
> >> >> > I agree with you regarding getting topics and partitions via
> >> >> AdminClient,
> >> >> > just curious how much the overhead would be. Would it be lighter, or
> >> >> > heavier? We may not want to list topics in regular intervals - in
> >> plan
> >> >> > phase we want to know up-to-date information so that the calculation
> >> >> from
> >> >> > Spark itself makes sense.
> >> >>
> >> >> It would be lighter. The consumer will periodically refresh metadata
> >> for
> >> >> any topic you are subscribed to. AdminClient doesn’t have the concept
> >> of
> >> >> subscriptions, and won’t refresh topic metadata until you request it.
> >> >>
> >> >
> >> > Sounds great! Happy to hear about that.
> >> >
> >> >
> >> >>
> >> >> >
> >> >> > On the other hands I'm not seeing any information regarding offset in
> >> >> > current AdminClient, which is also one of reason we leverage consumer
> >> >> and
> >> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> >> >> could
> >> >> > you refer KIPs so that we can see whether it would work for our case?
> >> >> > Without support of this we cannot replace our usage of consumer/poll
> >> >> with
> >> >> > AdminClient.
> >> >>
> >> >> KIP-396 is the one for listing offsets in AdminClient.
> >> >>
> >> >
> >> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
> >> > information, even for timestamp. Thanks!
> >> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> >> > but not a big deal as it just requires two calls.
> >> >
> >> > >
> >> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
> >> which
> >> >> > receives regex same as consumer subscription via pattern. We would
> >> like
> >> >> to
> >> >> > provide same behavior what Kafka is basically providing as a source.
> >> >>
> >> >> We don’t have a regex listTopics at the moment, though we could add
> >> this.
> >> >> Currently, the regex is done on the client side anyway (although we’d
> >> >> really like to change this in the future). So just listing everything
> >> and
> >> >> filtering locally would be the same performance and behavior as the
> >> >> Consumer.
> >> >>
> >> >
> >> > I see. Good to know regex is done on the client side - I've just
> >> searched
> >> > some code and it applies filter for all topics retrieved from metadata
> >> > fetch. Then it would be mostly no difference on this. Thanks for
> >> confirming.
> >> >
> >> >
> >> >>
> >> >> best,
> >> >> Colin
> >> >>
> >> >> >
> >> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
> >> matthias@confluent.io>
> >> >> > wrote:
> >> >> >
> >> >> > > Thanks for the details Jungtaek!
> >> >> > >
> >> >> > > I tend to agree with Colin, that using the AdminClient seems to be
> >> the
> >> >> > > better choice.
> >> >> > >
> >> >> > > You can get all topics via `listTopics()` (and you can refresh this
> >> >> > > information on regular intervals) and match any pattern against the
> >> >> list
> >> >> > > of available topics in the driver.
> >> >> > >
> >> >> > > As you use `assignment()` and store offsets in the Spark
> >> checkpoint,
> >> >> it
> >> >> > > seems that using consumer group management is not a good fit for
> >> the
> >> >> use
> >> >> > > case.
> >> >> > >
> >> >> > >
> >> >> > > Thoughts?
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > -Matthias
> >> >> > >
> >> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >> >> > > > Hi,
> >> >> > > >
> >> >> > > > If there’s no need to consume records in the Spark driver, then
> >> the
> >> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
> >> use
> >> >> > > AdminClient to find out what partitions exist and where, manage
> >> their
> >> >> > > offsets, and so on. There are some KIPs under discussion now that
> >> >> would add
> >> >> > > the necessary APIs for managing offsets.
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Colin
> >> >> > > >
> >> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> >> > > >> My feeling is that I didn't explain the use case for Spark
> >> >> properly and
> >> >> > > >> hence fail to explain the needs. Sorry about this.
> >> >> > > >>
> >> >> > > >> Spark leverages the single instance of KafkaConsumer in the
> >> driver
> >> >> > > which is
> >> >> > > >> registered solely on the consumer group. This is used in the
> >> plan
> >> >> phase
> >> >> > > for
> >> >> > > >> each micro-batch to calculate the overall topicpartitions with
> >> its
> >> >> > > offset
> >> >> > > >> ranges for this batch, and split and assign (topicpartition,
> >> >> fromOffset,
> >> >> > > >> untilOffset) to each input partition. After the planning is done
> >> >> and
> >> >> > > tasks
> >> >> > > >> are being distributed to executors, consumer per each input
> >> >> partition
> >> >> > > will
> >> >> > > >> be initialized from some executor (being assigned to the single
> >> >> > > >> topicpartition), and pull the actual records. (Pooling
> >> consumers is
> >> >> > > applied
> >> >> > > >> for sure.) As plan phase is to determine the overall
> >> >> topicpartitions and
> >> >> > > >> offset ranges to process, Spark is never interested on pulling
> >> the
> >> >> > > records
> >> >> > > >> in driver side.
> >> >> > > >>
> >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
> >> >> partitions and
> >> >> > > >> adopt the changes or validate the expectation. That's not only
> >> use
> >> >> case
> >> >> > > for
> >> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to
> >> the
> >> >> > > >> earliest or the latest, or specific one (either provided by end
> >> >> user or
> >> >> > > the
> >> >> > > >> last committed offset) so that Spark can have actual offset or
> >> >> validate
> >> >> > > the
> >> >> > > >> provided offset. According to the javadoc (if I understand
> >> >> correctly),
> >> >> > > to
> >> >> > > >> get the offset immediately it seems to be required to call
> >> `poll`
> >> >> or
> >> >> > > >> `position`.
> >> >> > > >>
> >> >> > > >> The way Spark interacts with Kafka in this plan phase in driver
> >> is
> >> >> > > >> synchronous, as the phase should finish ASAP to run the next
> >> phase.
> >> >> > > >> Registering ConsumerRebalanceListener and tracking the change
> >> will
> >> >> > > require
> >> >> > > >> some asynchronous handling which sounds to add unnecessary
> >> >> complexity.
> >> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
> >> >> what
> >> >> > > >> methods in KafkaConsumer have been providing - they're not
> >> >> > > asynchronous, at
> >> >> > > >> least for callers) but dealing with asynchronous is another
> >> level
> >> >> of
> >> >> > > >> interest. I can see the benefit where continuous thread runs and
> >> >> the
> >> >> > > >> consumer is busy with something continuously, relying on
> >> listener
> >> >> to
> >> >> > > hear
> >> >> > > >> the news on reassignment. Unfortunately that's not the case.
> >> >> > > >>
> >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
> >> code
> >> >> also
> >> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
> >> many
> >> >> > > places
> >> >> > > >> as it's appropriate to the place which blocking (+timeout) call
> >> is
> >> >> > > >> preferred - so I can see the similar needs from here as well.
> >> >> > > >>
> >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> >> > > gabor.g.somogyi@gmail.com>
> >> >> > > >> wrote:
> >> >> > > >>
> >> >> > > >>> Hi Guys,
> >> >> > > >>>
> >> >> > > >>> Please see the actual implementation, pretty sure it explains
> >> the
> >> >> > > situation
> >> >> > > >>> well:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >> >> > > >>>
> >> >> > > >>> To answer one question/assumption which popped up from all of
> >> you
> >> >> > > Spark not
> >> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >> >> > > >>> KafkaConsumer#assign as well.
> >> >> > > >>> Please see here:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >> >> > > >>>
> >> >> > > >>> BR,
> >> >> > > >>> G
> >> >> > > >>>
> >> >> > > >>>
> >> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> >> > > satish.duggana@gmail.com>
> >> >> > > >>> wrote:
> >> >> > > >>>
> >> >> > > >>>> Hi Jungtaek,
> >> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
> >> >> > > >>>> Is not Spark using Kafka's consumer group management across
> >> >> multiple
> >> >> > > >>>> consumers?
> >> >> > > >>>>
> >> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
> >> >> for a
> >> >> > > >>>> pattern based subscription and Spark manually assigns those
> >> >> > > >>>> topic-partitions across consumers on workers?
> >> >> > > >>>>
> >> >> > > >>>> Thanks,
> >> >> > > >>>> Satish.
> >> >> > > >>>>
> >> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> >> > > matthias@confluent.io>
> >> >> > > >>>> wrote:
> >> >> > > >>>>
> >> >> > > >>>>> If am not sure if I fully understand yet.
> >> >> > > >>>>>
> >> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
> >> >> part of
> >> >> > > >>> its
> >> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> >> >> missing
> >> >> > > >>>>> something here.
> >> >> > > >>>>>
> >> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> >> >> mechanism,
> >> >> > > >>>>> that takes care of the assignment of partitions to clients
> >> >> within the
> >> >> > > >>>>> group. Therefore, I am not sure what you mean by:
> >> >> > > >>>>>
> >> >> > > >>>>>> which Spark needs to
> >> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>
> >> >> > > >>>>> Multiple thoughts that may help:
> >> >> > > >>>>>
> >> >> > > >>>>> - if Spark needs more control about the partition assignment,
> >> >> you can
> >> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the
> >> consumer
> >> >> > > >>>>> configuration)
> >> >> > > >>>>>
> >> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
> >> via
> >> >> > > >>>>> `subscribe()` to get informed when the group rebalances
> >> >> > > >>>>>
> >> >> > > >>>>> As you pointed out, using pattern subscription metadata can
> >> >> change if
> >> >> > > >>>>> topic are added/deleted. However, each metadata change will
> >> >> > > triggering
> >> >> > > >>> a
> >> >> > > >>>>> rebalance and thus you would get corresponding calls to you
> >> >> rebalance
> >> >> > > >>>>> listener to learn about it and react accordingly.
> >> >> > > >>>>>
> >> >> > > >>>>> Maybe you can explain why neither of both approaches works
> >> and
> >> >> what
> >> >> > > gap
> >> >> > > >>>>> the new API would close?
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>> -Matthias
> >> >> > > >>>>>
> >> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say
> >> about
> >> >> Apache
> >> >> > > >>>>> Spark,
> >> >> > > >>>>>> but this will apply for everything which want to control
> >> >> offset of
> >> >> > > >>>> Kafka
> >> >> > > >>>>>> consumers.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Spark is managing the committed offsets and the offsets
> >> which
> >> >> should
> >> >> > > >>> be
> >> >> > > >>>>>> polled now. Topics and partitions as well. This is required
> >> as
> >> >> Spark
> >> >> > > >>>>> itself
> >> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
> >> >> one of
> >> >> > > >>>>>> source/sink (though it's considered as very important).
> >> >> > > >>>>>>
> >> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> >> >> topics and
> >> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll),
> >> but
> >> >> as
> >> >> > > >>> Spark
> >> >> > > >>>>> can
> >> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
> >> can
> >> >> be
> >> >> > > >>>> changed
> >> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
> >> >> Spark
> >> >> > > >>> needs
> >> >> > > >>>>> to
> >> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Looks like assignment() doesn't update the assignment
> >> >> information in
> >> >> > > >>>>>> consumer. It just returns known one. There's only one known
> >> >> approach
> >> >> > > >>>>> doing
> >> >> > > >>>>>> this, calling `poll`, but Spark is not interested on
> >> returned
> >> >> > > >>> records,
> >> >> > > >>>> so
> >> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated
> >> the
> >> >> API.
> >> >> > > >>> This
> >> >> > > >>>>> KIP
> >> >> > > >>>>>> proposes to support this as official approach.
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
> >> >> kabhwan@gmail.com>
> >> >> > > >>>> wrote:
> >> >> > > >>>>>>
> >> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as
> >> well.
> >> >> I'm in
> >> >> > > >>>>> favor
> >> >> > > >>>>>>> of describing it in this discussion thread so the
> >> discussion
> >> >> itself
> >> >> > > >>>> can
> >> >> > > >>>>> go
> >> >> > > >>>>>>> forward. So copying my answer here:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> We have some use case which we don't just rely on
> >> everything
> >> >> what
> >> >> > > >>>> Kafka
> >> >> > > >>>>>>> consumer provides. We want to know current assignment on
> >> this
> >> >> > > >>>> consumer,
> >> >> > > >>>>> and
> >> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> That said, we don't want to pull any records here, and if
> >> I'm
> >> >> not
> >> >> > > >>>>> missing
> >> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
> >> >> I'm
> >> >> > > >>>> missing
> >> >> > > >>>>>>> something.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Thanks,
> >> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >> >> > > >>>> matthias@confluent.io>
> >> >> > > >>>>>>> wrote:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>> Thanks for the KIP.
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
> >> this
> >> >> > > >>> feature?
> >> >> > > >>>>>>>> Why would a consumer need to update it's metadata
> >> explicitly?
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> -Matthias
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >> >> > > >>>>>>>>> Hi devs,
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing
> >> new
> >> >> > > >>> public
> >> >> > > >>>>>>>> method
> >> >> > > >>>>>>>>> to only update assignment metadata in consumer.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> >> >> doesn't
> >> >> > > >>>>>>>> guarantee
> >> >> > > >>>>>>>>> that it doesn't pull any records, and new method
> >> >> `poll(Duration)`
> >> >> > > >>>>>>>> doesn't
> >> >> > > >>>>>>>>> have same semantic, so would like to propose new public
> >> API
> >> >> which
> >> >> > > >>>> only
> >> >> > > >>>>>>>> does
> >> >> > > >>>>>>>>> the desired behavior.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
> >> >> as I'm
> >> >> > > >>> new
> >> >> > > >>>>> to
> >> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
> >> >> > > >>> TimeoutException
> >> >> > > >>>>> vs
> >> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Thanks in advance!
> >> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> --
> >> >> > > >>>>>>> Name : Jungtaek Lim
> >> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
> >> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> >> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>
> >> >> > > >>>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> --
> >> >> > > >> Name : Jungtaek Lim
> >> >> > > >> Blog : http://medium.com/@heartsavior
> >> >> > > >> Twitter : http://twitter.com/heartsavior
> >> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >> > --
> >> >> > Name : Jungtaek Lim
> >> >> > Blog : http://medium.com/@heartsavior
> >> >> > Twitter : http://twitter.com/heartsavior
> >> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > Name : Jungtaek Lim
> >> > Blog : http://medium.com/@heartsavior
> >> > Twitter : http://twitter.com/heartsavior
> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >
> >>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Gabor Somogyi <ga...@gmail.com>.
I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope
others would consider it as a good solution...

G


On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <ga...@gmail.com>
wrote:

> I've had concerns calling AdminClient.listTopics because on big clusters
> I've seen OOM because of too many TopicPartitions.
> On the other this problem already exists in the actual implementation
> because as Colin said Consumer is doing the same on client side. All in all
> this part is fine.
>
> I've checked all the actual use-cases on Spark side which has to be
> covered and it looks doable.
>
>
> On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <ka...@gmail.com> wrote:
>
>> So in overall, AdminClient covers the necessary to retrieve up-to-date
>> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
>> offset (EARLIEST, LATEST, timestamp) on partition.
>>
>> Gabor, could you please add the input if I'm missing something? I'd like
>> to
>> double-check on this.
>>
>> Assuming I'm not missing something, what would be preferred next action?
>> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
>> for KIP-396 opened at January and it still doesn't pass - 7 months - which
>> worries me a bit if it's going to pass the vote or not), but I also
>> respect
>> the lifecycle of KIP in Kafka community.
>>
>> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <ka...@gmail.com> wrote:
>>
>> >
>> >
>> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org>
>> wrote:
>> >
>> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
>> >> > Thanks for the feedbacks Colin and Matthias.
>> >> >
>> >> > I agree with you regarding getting topics and partitions via
>> >> AdminClient,
>> >> > just curious how much the overhead would be. Would it be lighter, or
>> >> > heavier? We may not want to list topics in regular intervals - in
>> plan
>> >> > phase we want to know up-to-date information so that the calculation
>> >> from
>> >> > Spark itself makes sense.
>> >>
>> >> It would be lighter. The consumer will periodically refresh metadata
>> for
>> >> any topic you are subscribed to. AdminClient doesn’t have the concept
>> of
>> >> subscriptions, and won’t refresh topic metadata until you request it.
>> >>
>> >
>> > Sounds great! Happy to hear about that.
>> >
>> >
>> >>
>> >> >
>> >> > On the other hands I'm not seeing any information regarding offset in
>> >> > current AdminClient, which is also one of reason we leverage consumer
>> >> and
>> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
>> >> could
>> >> > you refer KIPs so that we can see whether it would work for our case?
>> >> > Without support of this we cannot replace our usage of consumer/poll
>> >> with
>> >> > AdminClient.
>> >>
>> >> KIP-396 is the one for listing offsets in AdminClient.
>> >>
>> >
>> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
>> > information, even for timestamp. Thanks!
>> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
>> > but not a big deal as it just requires two calls.
>> >
>> > >
>> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
>> which
>> >> > receives regex same as consumer subscription via pattern. We would
>> like
>> >> to
>> >> > provide same behavior what Kafka is basically providing as a source.
>> >>
>> >> We don’t have a regex listTopics at the moment, though we could add
>> this.
>> >> Currently, the regex is done on the client side anyway (although we’d
>> >> really like to change this in the future). So just listing everything
>> and
>> >> filtering locally would be the same performance and behavior as the
>> >> Consumer.
>> >>
>> >
>> > I see. Good to know regex is done on the client side - I've just
>> searched
>> > some code and it applies filter for all topics retrieved from metadata
>> > fetch. Then it would be mostly no difference on this. Thanks for
>> confirming.
>> >
>> >
>> >>
>> >> best,
>> >> Colin
>> >>
>> >> >
>> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
>> matthias@confluent.io>
>> >> > wrote:
>> >> >
>> >> > > Thanks for the details Jungtaek!
>> >> > >
>> >> > > I tend to agree with Colin, that using the AdminClient seems to be
>> the
>> >> > > better choice.
>> >> > >
>> >> > > You can get all topics via `listTopics()` (and you can refresh this
>> >> > > information on regular intervals) and match any pattern against the
>> >> list
>> >> > > of available topics in the driver.
>> >> > >
>> >> > > As you use `assignment()` and store offsets in the Spark
>> checkpoint,
>> >> it
>> >> > > seems that using consumer group management is not a good fit for
>> the
>> >> use
>> >> > > case.
>> >> > >
>> >> > >
>> >> > > Thoughts?
>> >> > >
>> >> > >
>> >> > >
>> >> > > -Matthias
>> >> > >
>> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
>> >> > > > Hi,
>> >> > > >
>> >> > > > If there’s no need to consume records in the Spark driver, then
>> the
>> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
>> use
>> >> > > AdminClient to find out what partitions exist and where, manage
>> their
>> >> > > offsets, and so on. There are some KIPs under discussion now that
>> >> would add
>> >> > > the necessary APIs for managing offsets.
>> >> > > >
>> >> > > > Best,
>> >> > > > Colin
>> >> > > >
>> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> >> > > >> My feeling is that I didn't explain the use case for Spark
>> >> properly and
>> >> > > >> hence fail to explain the needs. Sorry about this.
>> >> > > >>
>> >> > > >> Spark leverages the single instance of KafkaConsumer in the
>> driver
>> >> > > which is
>> >> > > >> registered solely on the consumer group. This is used in the
>> plan
>> >> phase
>> >> > > for
>> >> > > >> each micro-batch to calculate the overall topicpartitions with
>> its
>> >> > > offset
>> >> > > >> ranges for this batch, and split and assign (topicpartition,
>> >> fromOffset,
>> >> > > >> untilOffset) to each input partition. After the planning is done
>> >> and
>> >> > > tasks
>> >> > > >> are being distributed to executors, consumer per each input
>> >> partition
>> >> > > will
>> >> > > >> be initialized from some executor (being assigned to the single
>> >> > > >> topicpartition), and pull the actual records. (Pooling
>> consumers is
>> >> > > applied
>> >> > > >> for sure.) As plan phase is to determine the overall
>> >> topicpartitions and
>> >> > > >> offset ranges to process, Spark is never interested on pulling
>> the
>> >> > > records
>> >> > > >> in driver side.
>> >> > > >>
>> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
>> >> partitions and
>> >> > > >> adopt the changes or validate the expectation. That's not only
>> use
>> >> case
>> >> > > for
>> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to
>> the
>> >> > > >> earliest or the latest, or specific one (either provided by end
>> >> user or
>> >> > > the
>> >> > > >> last committed offset) so that Spark can have actual offset or
>> >> validate
>> >> > > the
>> >> > > >> provided offset. According to the javadoc (if I understand
>> >> correctly),
>> >> > > to
>> >> > > >> get the offset immediately it seems to be required to call
>> `poll`
>> >> or
>> >> > > >> `position`.
>> >> > > >>
>> >> > > >> The way Spark interacts with Kafka in this plan phase in driver
>> is
>> >> > > >> synchronous, as the phase should finish ASAP to run the next
>> phase.
>> >> > > >> Registering ConsumerRebalanceListener and tracking the change
>> will
>> >> > > require
>> >> > > >> some asynchronous handling which sounds to add unnecessary
>> >> complexity.
>> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
>> >> what
>> >> > > >> methods in KafkaConsumer have been providing - they're not
>> >> > > asynchronous, at
>> >> > > >> least for callers) but dealing with asynchronous is another
>> level
>> >> of
>> >> > > >> interest. I can see the benefit where continuous thread runs and
>> >> the
>> >> > > >> consumer is busy with something continuously, relying on
>> listener
>> >> to
>> >> > > hear
>> >> > > >> the news on reassignment. Unfortunately that's not the case.
>> >> > > >>
>> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
>> code
>> >> also
>> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
>> many
>> >> > > places
>> >> > > >> as it's appropriate to the place which blocking (+timeout) call
>> is
>> >> > > >> preferred - so I can see the similar needs from here as well.
>> >> > > >>
>> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>> >> > > gabor.g.somogyi@gmail.com>
>> >> > > >> wrote:
>> >> > > >>
>> >> > > >>> Hi Guys,
>> >> > > >>>
>> >> > > >>> Please see the actual implementation, pretty sure it explains
>> the
>> >> > > situation
>> >> > > >>> well:
>> >> > > >>>
>> >> > > >>>
>> >> > >
>> >>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>> >> > > >>>
>> >> > > >>> To answer one question/assumption which popped up from all of
>> you
>> >> > > Spark not
>> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
>> >> > > >>> KafkaConsumer#assign as well.
>> >> > > >>> Please see here:
>> >> > > >>>
>> >> > > >>>
>> >> > >
>> >>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>> >> > > >>>
>> >> > > >>> BR,
>> >> > > >>> G
>> >> > > >>>
>> >> > > >>>
>> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>> >> > > satish.duggana@gmail.com>
>> >> > > >>> wrote:
>> >> > > >>>
>> >> > > >>>> Hi Jungtaek,
>> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
>> >> > > >>>> Is not Spark using Kafka's consumer group management across
>> >> multiple
>> >> > > >>>> consumers?
>> >> > > >>>>
>> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
>> >> for a
>> >> > > >>>> pattern based subscription and Spark manually assigns those
>> >> > > >>>> topic-partitions across consumers on workers?
>> >> > > >>>>
>> >> > > >>>> Thanks,
>> >> > > >>>> Satish.
>> >> > > >>>>
>> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>> >> > > matthias@confluent.io>
>> >> > > >>>> wrote:
>> >> > > >>>>
>> >> > > >>>>> If am not sure if I fully understand yet.
>> >> > > >>>>>
>> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
>> >> part of
>> >> > > >>> its
>> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
>> >> missing
>> >> > > >>>>> something here.
>> >> > > >>>>>
>> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
>> >> mechanism,
>> >> > > >>>>> that takes care of the assignment of partitions to clients
>> >> within the
>> >> > > >>>>> group. Therefore, I am not sure what you mean by:
>> >> > > >>>>>
>> >> > > >>>>>> which Spark needs to
>> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
>> >> > > >>>>>
>> >> > > >>>>> Multiple thoughts that may help:
>> >> > > >>>>>
>> >> > > >>>>> - if Spark needs more control about the partition assignment,
>> >> you can
>> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the
>> consumer
>> >> > > >>>>> configuration)
>> >> > > >>>>>
>> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
>> via
>> >> > > >>>>> `subscribe()` to get informed when the group rebalances
>> >> > > >>>>>
>> >> > > >>>>> As you pointed out, using pattern subscription metadata can
>> >> change if
>> >> > > >>>>> topic are added/deleted. However, each metadata change will
>> >> > > triggering
>> >> > > >>> a
>> >> > > >>>>> rebalance and thus you would get corresponding calls to you
>> >> rebalance
>> >> > > >>>>> listener to learn about it and react accordingly.
>> >> > > >>>>>
>> >> > > >>>>> Maybe you can explain why neither of both approaches works
>> and
>> >> what
>> >> > > gap
>> >> > > >>>>> the new API would close?
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>> -Matthias
>> >> > > >>>>>
>> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say
>> about
>> >> Apache
>> >> > > >>>>> Spark,
>> >> > > >>>>>> but this will apply for everything which want to control
>> >> offset of
>> >> > > >>>> Kafka
>> >> > > >>>>>> consumers.
>> >> > > >>>>>>
>> >> > > >>>>>> Spark is managing the committed offsets and the offsets
>> which
>> >> should
>> >> > > >>> be
>> >> > > >>>>>> polled now. Topics and partitions as well. This is required
>> as
>> >> Spark
>> >> > > >>>>> itself
>> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
>> >> one of
>> >> > > >>>>>> source/sink (though it's considered as very important).
>> >> > > >>>>>>
>> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
>> >> topics and
>> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll),
>> but
>> >> as
>> >> > > >>> Spark
>> >> > > >>>>> can
>> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
>> can
>> >> be
>> >> > > >>>> changed
>> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
>> >> Spark
>> >> > > >>> needs
>> >> > > >>>>> to
>> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
>> >> > > >>>>>>
>> >> > > >>>>>> Looks like assignment() doesn't update the assignment
>> >> information in
>> >> > > >>>>>> consumer. It just returns known one. There's only one known
>> >> approach
>> >> > > >>>>> doing
>> >> > > >>>>>> this, calling `poll`, but Spark is not interested on
>> returned
>> >> > > >>> records,
>> >> > > >>>> so
>> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated
>> the
>> >> API.
>> >> > > >>> This
>> >> > > >>>>> KIP
>> >> > > >>>>>> proposes to support this as official approach.
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
>> >> kabhwan@gmail.com>
>> >> > > >>>> wrote:
>> >> > > >>>>>>
>> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as
>> well.
>> >> I'm in
>> >> > > >>>>> favor
>> >> > > >>>>>>> of describing it in this discussion thread so the
>> discussion
>> >> itself
>> >> > > >>>> can
>> >> > > >>>>> go
>> >> > > >>>>>>> forward. So copying my answer here:
>> >> > > >>>>>>>
>> >> > > >>>>>>> We have some use case which we don't just rely on
>> everything
>> >> what
>> >> > > >>>> Kafka
>> >> > > >>>>>>> consumer provides. We want to know current assignment on
>> this
>> >> > > >>>> consumer,
>> >> > > >>>>> and
>> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>> >> > > >>>>>>>
>> >> > > >>>>>>> That said, we don't want to pull any records here, and if
>> I'm
>> >> not
>> >> > > >>>>> missing
>> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
>> >> I'm
>> >> > > >>>> missing
>> >> > > >>>>>>> something.
>> >> > > >>>>>>>
>> >> > > >>>>>>> Thanks,
>> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>> >> > > >>>> matthias@confluent.io>
>> >> > > >>>>>>> wrote:
>> >> > > >>>>>>>
>> >> > > >>>>>>>> Thanks for the KIP.
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
>> this
>> >> > > >>> feature?
>> >> > > >>>>>>>> Why would a consumer need to update it's metadata
>> explicitly?
>> >> > > >>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> -Matthias
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>> >> > > >>>>>>>>> Hi devs,
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing
>> new
>> >> > > >>> public
>> >> > > >>>>>>>> method
>> >> > > >>>>>>>>> to only update assignment metadata in consumer.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
>> >> doesn't
>> >> > > >>>>>>>> guarantee
>> >> > > >>>>>>>>> that it doesn't pull any records, and new method
>> >> `poll(Duration)`
>> >> > > >>>>>>>> doesn't
>> >> > > >>>>>>>>> have same semantic, so would like to propose new public
>> API
>> >> which
>> >> > > >>>> only
>> >> > > >>>>>>>> does
>> >> > > >>>>>>>>> the desired behavior.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
>> >> as I'm
>> >> > > >>> new
>> >> > > >>>>> to
>> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
>> >> > > >>> TimeoutException
>> >> > > >>>>> vs
>> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thanks in advance!
>> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> --
>> >> > > >>>>>>> Name : Jungtaek Lim
>> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
>> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
>> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> > > >>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > > >>
>> >> > > >> --
>> >> > > >> Name : Jungtaek Lim
>> >> > > >> Blog : http://medium.com/@heartsavior
>> >> > > >> Twitter : http://twitter.com/heartsavior
>> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> > > >>
>> >> > > >
>> >> > >
>> >> > >
>> >> >
>> >> > --
>> >> > Name : Jungtaek Lim
>> >> > Blog : http://medium.com/@heartsavior
>> >> > Twitter : http://twitter.com/heartsavior
>> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> >
>> >>
>> >
>> >
>> > --
>> > Name : Jungtaek Lim
>> > Blog : http://medium.com/@heartsavior
>> > Twitter : http://twitter.com/heartsavior
>> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >
>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Gabor Somogyi <ga...@gmail.com>.
I've had concerns calling AdminClient.listTopics because on big clusters
I've seen OOM because of too many TopicPartitions.
On the other this problem already exists in the actual implementation
because as Colin said Consumer is doing the same on client side. All in all
this part is fine.

I've checked all the actual use-cases on Spark side which has to be covered
and it looks doable.


On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <ka...@gmail.com> wrote:

> So in overall, AdminClient covers the necessary to retrieve up-to-date
> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
> offset (EARLIEST, LATEST, timestamp) on partition.
>
> Gabor, could you please add the input if I'm missing something? I'd like to
> double-check on this.
>
> Assuming I'm not missing something, what would be preferred next action?
> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
> for KIP-396 opened at January and it still doesn't pass - 7 months - which
> worries me a bit if it's going to pass the vote or not), but I also respect
> the lifecycle of KIP in Kafka community.
>
> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <ka...@gmail.com> wrote:
>
> >
> >
> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org>
> wrote:
> >
> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> >> > Thanks for the feedbacks Colin and Matthias.
> >> >
> >> > I agree with you regarding getting topics and partitions via
> >> AdminClient,
> >> > just curious how much the overhead would be. Would it be lighter, or
> >> > heavier? We may not want to list topics in regular intervals - in plan
> >> > phase we want to know up-to-date information so that the calculation
> >> from
> >> > Spark itself makes sense.
> >>
> >> It would be lighter. The consumer will periodically refresh metadata for
> >> any topic you are subscribed to. AdminClient doesn’t have the concept of
> >> subscriptions, and won’t refresh topic metadata until you request it.
> >>
> >
> > Sounds great! Happy to hear about that.
> >
> >
> >>
> >> >
> >> > On the other hands I'm not seeing any information regarding offset in
> >> > current AdminClient, which is also one of reason we leverage consumer
> >> and
> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> >> could
> >> > you refer KIPs so that we can see whether it would work for our case?
> >> > Without support of this we cannot replace our usage of consumer/poll
> >> with
> >> > AdminClient.
> >>
> >> KIP-396 is the one for listing offsets in AdminClient.
> >>
> >
> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
> > information, even for timestamp. Thanks!
> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> > but not a big deal as it just requires two calls.
> >
> > >
> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
> which
> >> > receives regex same as consumer subscription via pattern. We would
> like
> >> to
> >> > provide same behavior what Kafka is basically providing as a source.
> >>
> >> We don’t have a regex listTopics at the moment, though we could add
> this.
> >> Currently, the regex is done on the client side anyway (although we’d
> >> really like to change this in the future). So just listing everything
> and
> >> filtering locally would be the same performance and behavior as the
> >> Consumer.
> >>
> >
> > I see. Good to know regex is done on the client side - I've just searched
> > some code and it applies filter for all topics retrieved from metadata
> > fetch. Then it would be mostly no difference on this. Thanks for
> confirming.
> >
> >
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
> matthias@confluent.io>
> >> > wrote:
> >> >
> >> > > Thanks for the details Jungtaek!
> >> > >
> >> > > I tend to agree with Colin, that using the AdminClient seems to be
> the
> >> > > better choice.
> >> > >
> >> > > You can get all topics via `listTopics()` (and you can refresh this
> >> > > information on regular intervals) and match any pattern against the
> >> list
> >> > > of available topics in the driver.
> >> > >
> >> > > As you use `assignment()` and store offsets in the Spark checkpoint,
> >> it
> >> > > seems that using consumer group management is not a good fit for the
> >> use
> >> > > case.
> >> > >
> >> > >
> >> > > Thoughts?
> >> > >
> >> > >
> >> > >
> >> > > -Matthias
> >> > >
> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >> > > > Hi,
> >> > > >
> >> > > > If there’s no need to consume records in the Spark driver, then
> the
> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
> use
> >> > > AdminClient to find out what partitions exist and where, manage
> their
> >> > > offsets, and so on. There are some KIPs under discussion now that
> >> would add
> >> > > the necessary APIs for managing offsets.
> >> > > >
> >> > > > Best,
> >> > > > Colin
> >> > > >
> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> > > >> My feeling is that I didn't explain the use case for Spark
> >> properly and
> >> > > >> hence fail to explain the needs. Sorry about this.
> >> > > >>
> >> > > >> Spark leverages the single instance of KafkaConsumer in the
> driver
> >> > > which is
> >> > > >> registered solely on the consumer group. This is used in the plan
> >> phase
> >> > > for
> >> > > >> each micro-batch to calculate the overall topicpartitions with
> its
> >> > > offset
> >> > > >> ranges for this batch, and split and assign (topicpartition,
> >> fromOffset,
> >> > > >> untilOffset) to each input partition. After the planning is done
> >> and
> >> > > tasks
> >> > > >> are being distributed to executors, consumer per each input
> >> partition
> >> > > will
> >> > > >> be initialized from some executor (being assigned to the single
> >> > > >> topicpartition), and pull the actual records. (Pooling consumers
> is
> >> > > applied
> >> > > >> for sure.) As plan phase is to determine the overall
> >> topicpartitions and
> >> > > >> offset ranges to process, Spark is never interested on pulling
> the
> >> > > records
> >> > > >> in driver side.
> >> > > >>
> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
> >> partitions and
> >> > > >> adopt the changes or validate the expectation. That's not only
> use
> >> case
> >> > > for
> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to
> the
> >> > > >> earliest or the latest, or specific one (either provided by end
> >> user or
> >> > > the
> >> > > >> last committed offset) so that Spark can have actual offset or
> >> validate
> >> > > the
> >> > > >> provided offset. According to the javadoc (if I understand
> >> correctly),
> >> > > to
> >> > > >> get the offset immediately it seems to be required to call `poll`
> >> or
> >> > > >> `position`.
> >> > > >>
> >> > > >> The way Spark interacts with Kafka in this plan phase in driver
> is
> >> > > >> synchronous, as the phase should finish ASAP to run the next
> phase.
> >> > > >> Registering ConsumerRebalanceListener and tracking the change
> will
> >> > > require
> >> > > >> some asynchronous handling which sounds to add unnecessary
> >> complexity.
> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
> >> what
> >> > > >> methods in KafkaConsumer have been providing - they're not
> >> > > asynchronous, at
> >> > > >> least for callers) but dealing with asynchronous is another level
> >> of
> >> > > >> interest. I can see the benefit where continuous thread runs and
> >> the
> >> > > >> consumer is busy with something continuously, relying on listener
> >> to
> >> > > hear
> >> > > >> the news on reassignment. Unfortunately that's not the case.
> >> > > >>
> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
> code
> >> also
> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
> many
> >> > > places
> >> > > >> as it's appropriate to the place which blocking (+timeout) call
> is
> >> > > >> preferred - so I can see the similar needs from here as well.
> >> > > >>
> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> > > gabor.g.somogyi@gmail.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> Hi Guys,
> >> > > >>>
> >> > > >>> Please see the actual implementation, pretty sure it explains
> the
> >> > > situation
> >> > > >>> well:
> >> > > >>>
> >> > > >>>
> >> > >
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >> > > >>>
> >> > > >>> To answer one question/assumption which popped up from all of
> you
> >> > > Spark not
> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >> > > >>> KafkaConsumer#assign as well.
> >> > > >>> Please see here:
> >> > > >>>
> >> > > >>>
> >> > >
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >> > > >>>
> >> > > >>> BR,
> >> > > >>> G
> >> > > >>>
> >> > > >>>
> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> > > satish.duggana@gmail.com>
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Hi Jungtaek,
> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
> >> > > >>>> Is not Spark using Kafka's consumer group management across
> >> multiple
> >> > > >>>> consumers?
> >> > > >>>>
> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
> >> for a
> >> > > >>>> pattern based subscription and Spark manually assigns those
> >> > > >>>> topic-partitions across consumers on workers?
> >> > > >>>>
> >> > > >>>> Thanks,
> >> > > >>>> Satish.
> >> > > >>>>
> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> > > matthias@confluent.io>
> >> > > >>>> wrote:
> >> > > >>>>
> >> > > >>>>> If am not sure if I fully understand yet.
> >> > > >>>>>
> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
> >> part of
> >> > > >>> its
> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> >> missing
> >> > > >>>>> something here.
> >> > > >>>>>
> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> >> mechanism,
> >> > > >>>>> that takes care of the assignment of partitions to clients
> >> within the
> >> > > >>>>> group. Therefore, I am not sure what you mean by:
> >> > > >>>>>
> >> > > >>>>>> which Spark needs to
> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> >> > > >>>>>
> >> > > >>>>> Multiple thoughts that may help:
> >> > > >>>>>
> >> > > >>>>> - if Spark needs more control about the partition assignment,
> >> you can
> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> >> > > >>>>> configuration)
> >> > > >>>>>
> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
> via
> >> > > >>>>> `subscribe()` to get informed when the group rebalances
> >> > > >>>>>
> >> > > >>>>> As you pointed out, using pattern subscription metadata can
> >> change if
> >> > > >>>>> topic are added/deleted. However, each metadata change will
> >> > > triggering
> >> > > >>> a
> >> > > >>>>> rebalance and thus you would get corresponding calls to you
> >> rebalance
> >> > > >>>>> listener to learn about it and react accordingly.
> >> > > >>>>>
> >> > > >>>>> Maybe you can explain why neither of both approaches works and
> >> what
> >> > > gap
> >> > > >>>>> the new API would close?
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> -Matthias
> >> > > >>>>>
> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about
> >> Apache
> >> > > >>>>> Spark,
> >> > > >>>>>> but this will apply for everything which want to control
> >> offset of
> >> > > >>>> Kafka
> >> > > >>>>>> consumers.
> >> > > >>>>>>
> >> > > >>>>>> Spark is managing the committed offsets and the offsets which
> >> should
> >> > > >>> be
> >> > > >>>>>> polled now. Topics and partitions as well. This is required
> as
> >> Spark
> >> > > >>>>> itself
> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
> >> one of
> >> > > >>>>>> source/sink (though it's considered as very important).
> >> > > >>>>>>
> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> >> topics and
> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but
> >> as
> >> > > >>> Spark
> >> > > >>>>> can
> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
> can
> >> be
> >> > > >>>> changed
> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
> >> Spark
> >> > > >>> needs
> >> > > >>>>> to
> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> >> > > >>>>>>
> >> > > >>>>>> Looks like assignment() doesn't update the assignment
> >> information in
> >> > > >>>>>> consumer. It just returns known one. There's only one known
> >> approach
> >> > > >>>>> doing
> >> > > >>>>>> this, calling `poll`, but Spark is not interested on returned
> >> > > >>> records,
> >> > > >>>> so
> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the
> >> API.
> >> > > >>> This
> >> > > >>>>> KIP
> >> > > >>>>>> proposes to support this as official approach.
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
> >> kabhwan@gmail.com>
> >> > > >>>> wrote:
> >> > > >>>>>>
> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well.
> >> I'm in
> >> > > >>>>> favor
> >> > > >>>>>>> of describing it in this discussion thread so the discussion
> >> itself
> >> > > >>>> can
> >> > > >>>>> go
> >> > > >>>>>>> forward. So copying my answer here:
> >> > > >>>>>>>
> >> > > >>>>>>> We have some use case which we don't just rely on everything
> >> what
> >> > > >>>> Kafka
> >> > > >>>>>>> consumer provides. We want to know current assignment on
> this
> >> > > >>>> consumer,
> >> > > >>>>> and
> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >> > > >>>>>>>
> >> > > >>>>>>> That said, we don't want to pull any records here, and if
> I'm
> >> not
> >> > > >>>>> missing
> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
> >> I'm
> >> > > >>>> missing
> >> > > >>>>>>> something.
> >> > > >>>>>>>
> >> > > >>>>>>> Thanks,
> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >> > > >>>> matthias@confluent.io>
> >> > > >>>>>>> wrote:
> >> > > >>>>>>>
> >> > > >>>>>>>> Thanks for the KIP.
> >> > > >>>>>>>>
> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
> this
> >> > > >>> feature?
> >> > > >>>>>>>> Why would a consumer need to update it's metadata
> explicitly?
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>> -Matthias
> >> > > >>>>>>>>
> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >> > > >>>>>>>>> Hi devs,
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing
> new
> >> > > >>> public
> >> > > >>>>>>>> method
> >> > > >>>>>>>>> to only update assignment metadata in consumer.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> >> doesn't
> >> > > >>>>>>>> guarantee
> >> > > >>>>>>>>> that it doesn't pull any records, and new method
> >> `poll(Duration)`
> >> > > >>>>>>>> doesn't
> >> > > >>>>>>>>> have same semantic, so would like to propose new public
> API
> >> which
> >> > > >>>> only
> >> > > >>>>>>>> does
> >> > > >>>>>>>>> the desired behavior.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
> >> as I'm
> >> > > >>> new
> >> > > >>>>> to
> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
> >> > > >>> TimeoutException
> >> > > >>>>> vs
> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thanks in advance!
> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >> > > >>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>> --
> >> > > >>>>>>> Name : Jungtaek Lim
> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> Name : Jungtaek Lim
> >> > > >> Blog : http://medium.com/@heartsavior
> >> > > >> Twitter : http://twitter.com/heartsavior
> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> > > >>
> >> > > >
> >> > >
> >> > >
> >> >
> >> > --
> >> > Name : Jungtaek Lim
> >> > Blog : http://medium.com/@heartsavior
> >> > Twitter : http://twitter.com/heartsavior
> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >
> >>
> >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
So in overall, AdminClient covers the necessary to retrieve up-to-date
topic-partitions, whereas KIP-396 will cover the necessary to retrieve
offset (EARLIEST, LATEST, timestamp) on partition.

Gabor, could you please add the input if I'm missing something? I'd like to
double-check on this.

Assuming I'm not missing something, what would be preferred next action?
Personally I'd keep this as it is until KIP-396 passes the vote (the vote
for KIP-396 opened at January and it still doesn't pass - 7 months - which
worries me a bit if it's going to pass the vote or not), but I also respect
the lifecycle of KIP in Kafka community.

On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <ka...@gmail.com> wrote:

>
>
> On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org> wrote:
>
>> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
>> > Thanks for the feedbacks Colin and Matthias.
>> >
>> > I agree with you regarding getting topics and partitions via
>> AdminClient,
>> > just curious how much the overhead would be. Would it be lighter, or
>> > heavier? We may not want to list topics in regular intervals - in plan
>> > phase we want to know up-to-date information so that the calculation
>> from
>> > Spark itself makes sense.
>>
>> It would be lighter. The consumer will periodically refresh metadata for
>> any topic you are subscribed to. AdminClient doesn’t have the concept of
>> subscriptions, and won’t refresh topic metadata until you request it.
>>
>
> Sounds great! Happy to hear about that.
>
>
>>
>> >
>> > On the other hands I'm not seeing any information regarding offset in
>> > current AdminClient, which is also one of reason we leverage consumer
>> and
>> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
>> could
>> > you refer KIPs so that we can see whether it would work for our case?
>> > Without support of this we cannot replace our usage of consumer/poll
>> with
>> > AdminClient.
>>
>> KIP-396 is the one for listing offsets in AdminClient.
>>
>
> KIP-396 seems to fit to the needs on Spark's purpose to get offset
> information, even for timestamp. Thanks!
> I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> but not a big deal as it just requires two calls.
>
> >
>> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
>> > receives regex same as consumer subscription via pattern. We would like
>> to
>> > provide same behavior what Kafka is basically providing as a source.
>>
>> We don’t have a regex listTopics at the moment, though we could add this.
>> Currently, the regex is done on the client side anyway (although we’d
>> really like to change this in the future). So just listing everything and
>> filtering locally would be the same performance and behavior as the
>> Consumer.
>>
>
> I see. Good to know regex is done on the client side - I've just searched
> some code and it applies filter for all topics retrieved from metadata
> fetch. Then it would be mostly no difference on this. Thanks for confirming.
>
>
>>
>> best,
>> Colin
>>
>> >
>> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
>> > wrote:
>> >
>> > > Thanks for the details Jungtaek!
>> > >
>> > > I tend to agree with Colin, that using the AdminClient seems to be the
>> > > better choice.
>> > >
>> > > You can get all topics via `listTopics()` (and you can refresh this
>> > > information on regular intervals) and match any pattern against the
>> list
>> > > of available topics in the driver.
>> > >
>> > > As you use `assignment()` and store offsets in the Spark checkpoint,
>> it
>> > > seems that using consumer group management is not a good fit for the
>> use
>> > > case.
>> > >
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
>> > > > Hi,
>> > > >
>> > > > If there’s no need to consume records in the Spark driver, then the
>> > > Consumer is probably the wrong thing to use. Instead, Spark should use
>> > > AdminClient to find out what partitions exist and where, manage their
>> > > offsets, and so on. There are some KIPs under discussion now that
>> would add
>> > > the necessary APIs for managing offsets.
>> > > >
>> > > > Best,
>> > > > Colin
>> > > >
>> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> > > >> My feeling is that I didn't explain the use case for Spark
>> properly and
>> > > >> hence fail to explain the needs. Sorry about this.
>> > > >>
>> > > >> Spark leverages the single instance of KafkaConsumer in the driver
>> > > which is
>> > > >> registered solely on the consumer group. This is used in the plan
>> phase
>> > > for
>> > > >> each micro-batch to calculate the overall topicpartitions with its
>> > > offset
>> > > >> ranges for this batch, and split and assign (topicpartition,
>> fromOffset,
>> > > >> untilOffset) to each input partition. After the planning is done
>> and
>> > > tasks
>> > > >> are being distributed to executors, consumer per each input
>> partition
>> > > will
>> > > >> be initialized from some executor (being assigned to the single
>> > > >> topicpartition), and pull the actual records. (Pooling consumers is
>> > > applied
>> > > >> for sure.) As plan phase is to determine the overall
>> topicpartitions and
>> > > >> offset ranges to process, Spark is never interested on pulling the
>> > > records
>> > > >> in driver side.
>> > > >>
>> > > >> Spark mainly leverages poll(0) to get the latest assigned
>> partitions and
>> > > >> adopt the changes or validate the expectation. That's not only use
>> case
>> > > for
>> > > >> poll(0). Spark is also seeking the offset per topicpartition to the
>> > > >> earliest or the latest, or specific one (either provided by end
>> user or
>> > > the
>> > > >> last committed offset) so that Spark can have actual offset or
>> validate
>> > > the
>> > > >> provided offset. According to the javadoc (if I understand
>> correctly),
>> > > to
>> > > >> get the offset immediately it seems to be required to call `poll`
>> or
>> > > >> `position`.
>> > > >>
>> > > >> The way Spark interacts with Kafka in this plan phase in driver is
>> > > >> synchronous, as the phase should finish ASAP to run the next phase.
>> > > >> Registering ConsumerRebalanceListener and tracking the change will
>> > > require
>> > > >> some asynchronous handling which sounds to add unnecessary
>> complexity.
>> > > >> Spark may be OK with deal with synchronous with timeout (that's
>> what
>> > > >> methods in KafkaConsumer have been providing - they're not
>> > > asynchronous, at
>> > > >> least for callers) but dealing with asynchronous is another level
>> of
>> > > >> interest. I can see the benefit where continuous thread runs and
>> the
>> > > >> consumer is busy with something continuously, relying on listener
>> to
>> > > hear
>> > > >> the news on reassignment. Unfortunately that's not the case.
>> > > >>
>> > > >> Unit tests in Spark have similar needs: looks like Kafka test code
>> also
>> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
>> > > places
>> > > >> as it's appropriate to the place which blocking (+timeout) call is
>> > > >> preferred - so I can see the similar needs from here as well.
>> > > >>
>> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>> > > gabor.g.somogyi@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >>> Hi Guys,
>> > > >>>
>> > > >>> Please see the actual implementation, pretty sure it explains the
>> > > situation
>> > > >>> well:
>> > > >>>
>> > > >>>
>> > >
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>> > > >>>
>> > > >>> To answer one question/assumption which popped up from all of you
>> > > Spark not
>> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
>> > > >>> KafkaConsumer#assign as well.
>> > > >>> Please see here:
>> > > >>>
>> > > >>>
>> > >
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>> > > >>>
>> > > >>> BR,
>> > > >>> G
>> > > >>>
>> > > >>>
>> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>> > > satish.duggana@gmail.com>
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Hi Jungtaek,
>> > > >>>> Thanks for the KIP. I have a couple of questions here.
>> > > >>>> Is not Spark using Kafka's consumer group management across
>> multiple
>> > > >>>> consumers?
>> > > >>>>
>> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
>> for a
>> > > >>>> pattern based subscription and Spark manually assigns those
>> > > >>>> topic-partitions across consumers on workers?
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Satish.
>> > > >>>>
>> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>> > > matthias@confluent.io>
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> If am not sure if I fully understand yet.
>> > > >>>>>
>> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
>> part of
>> > > >>> its
>> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
>> missing
>> > > >>>>> something here.
>> > > >>>>>
>> > > >>>>> As you are using subscribe(), you use Kafka consumer group
>> mechanism,
>> > > >>>>> that takes care of the assignment of partitions to clients
>> within the
>> > > >>>>> group. Therefore, I am not sure what you mean by:
>> > > >>>>>
>> > > >>>>>> which Spark needs to
>> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
>> > > >>>>>
>> > > >>>>> Multiple thoughts that may help:
>> > > >>>>>
>> > > >>>>> - if Spark needs more control about the partition assignment,
>> you can
>> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
>> > > >>>>> configuration)
>> > > >>>>>
>> > > >>>>> - you may also want to register `ConsumerRebalanceListener` via
>> > > >>>>> `subscribe()` to get informed when the group rebalances
>> > > >>>>>
>> > > >>>>> As you pointed out, using pattern subscription metadata can
>> change if
>> > > >>>>> topic are added/deleted. However, each metadata change will
>> > > triggering
>> > > >>> a
>> > > >>>>> rebalance and thus you would get corresponding calls to you
>> rebalance
>> > > >>>>> listener to learn about it and react accordingly.
>> > > >>>>>
>> > > >>>>> Maybe you can explain why neither of both approaches works and
>> what
>> > > gap
>> > > >>>>> the new API would close?
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> -Matthias
>> > > >>>>>
>> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about
>> Apache
>> > > >>>>> Spark,
>> > > >>>>>> but this will apply for everything which want to control
>> offset of
>> > > >>>> Kafka
>> > > >>>>>> consumers.
>> > > >>>>>>
>> > > >>>>>> Spark is managing the committed offsets and the offsets which
>> should
>> > > >>> be
>> > > >>>>>> polled now. Topics and partitions as well. This is required as
>> Spark
>> > > >>>>> itself
>> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
>> one of
>> > > >>>>>> source/sink (though it's considered as very important).
>> > > >>>>>>
>> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
>> topics and
>> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but
>> as
>> > > >>> Spark
>> > > >>>>> can
>> > > >>>>>> also provide "patterns" of topics, as well as subscription can
>> be
>> > > >>>> changed
>> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
>> Spark
>> > > >>> needs
>> > > >>>>> to
>> > > >>>>>> know to coordinate multiple consumers to pull correctly.
>> > > >>>>>>
>> > > >>>>>> Looks like assignment() doesn't update the assignment
>> information in
>> > > >>>>>> consumer. It just returns known one. There's only one known
>> approach
>> > > >>>>> doing
>> > > >>>>>> this, calling `poll`, but Spark is not interested on returned
>> > > >>> records,
>> > > >>>> so
>> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the
>> API.
>> > > >>> This
>> > > >>>>> KIP
>> > > >>>>>> proposes to support this as official approach.
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
>> kabhwan@gmail.com>
>> > > >>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well.
>> I'm in
>> > > >>>>> favor
>> > > >>>>>>> of describing it in this discussion thread so the discussion
>> itself
>> > > >>>> can
>> > > >>>>> go
>> > > >>>>>>> forward. So copying my answer here:
>> > > >>>>>>>
>> > > >>>>>>> We have some use case which we don't just rely on everything
>> what
>> > > >>>> Kafka
>> > > >>>>>>> consumer provides. We want to know current assignment on this
>> > > >>>> consumer,
>> > > >>>>> and
>> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>> > > >>>>>>>
>> > > >>>>>>> That said, we don't want to pull any records here, and if I'm
>> not
>> > > >>>>> missing
>> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
>> I'm
>> > > >>>> missing
>> > > >>>>>>> something.
>> > > >>>>>>>
>> > > >>>>>>> Thanks,
>> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>> > > >>>> matthias@confluent.io>
>> > > >>>>>>> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> Thanks for the KIP.
>> > > >>>>>>>>
>> > > >>>>>>>> Can you elaborate a little bit more on the use case for this
>> > > >>> feature?
>> > > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> -Matthias
>> > > >>>>>>>>
>> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>> > > >>>>>>>>> Hi devs,
>> > > >>>>>>>>>
>> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
>> > > >>> public
>> > > >>>>>>>> method
>> > > >>>>>>>>> to only update assignment metadata in consumer.
>> > > >>>>>>>>>
>> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
>> doesn't
>> > > >>>>>>>> guarantee
>> > > >>>>>>>>> that it doesn't pull any records, and new method
>> `poll(Duration)`
>> > > >>>>>>>> doesn't
>> > > >>>>>>>>> have same semantic, so would like to propose new public API
>> which
>> > > >>>> only
>> > > >>>>>>>> does
>> > > >>>>>>>>> the desired behavior.
>> > > >>>>>>>>>
>> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>> > > >>>>>>>>>
>> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
>> as I'm
>> > > >>> new
>> > > >>>>> to
>> > > >>>>>>>>> Kafka community and may not catch preferences (like
>> > > >>> TimeoutException
>> > > >>>>> vs
>> > > >>>>>>>>> boolean, etc.) on Kafka project.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thanks in advance!
>> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> --
>> > > >>>>>>> Name : Jungtaek Lim
>> > > >>>>>>> Blog : http://medium.com/@heartsavior
>> > > >>>>>>> Twitter : http://twitter.com/heartsavior
>> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > >>
>> > > >> --
>> > > >> Name : Jungtaek Lim
>> > > >> Blog : http://medium.com/@heartsavior
>> > > >> Twitter : http://twitter.com/heartsavior
>> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
>> > > >>
>> > > >
>> > >
>> > >
>> >
>> > --
>> > Name : Jungtaek Lim
>> > Blog : http://medium.com/@heartsavior
>> > Twitter : http://twitter.com/heartsavior
>> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >
>>
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cm...@apache.org> wrote:

> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> > Thanks for the feedbacks Colin and Matthias.
> >
> > I agree with you regarding getting topics and partitions via AdminClient,
> > just curious how much the overhead would be. Would it be lighter, or
> > heavier? We may not want to list topics in regular intervals - in plan
> > phase we want to know up-to-date information so that the calculation from
> > Spark itself makes sense.
>
> It would be lighter. The consumer will periodically refresh metadata for
> any topic you are subscribed to. AdminClient doesn’t have the concept of
> subscriptions, and won’t refresh topic metadata until you request it.
>

Sounds great! Happy to hear about that.


>
> >
> > On the other hands I'm not seeing any information regarding offset in
> > current AdminClient, which is also one of reason we leverage consumer and
> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> could
> > you refer KIPs so that we can see whether it would work for our case?
> > Without support of this we cannot replace our usage of consumer/poll with
> > AdminClient.
>
> KIP-396 is the one for listing offsets in AdminClient.
>

KIP-396 seems to fit to the needs on Spark's purpose to get offset
information, even for timestamp. Thanks!
I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
but not a big deal as it just requires two calls.

>
> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> > receives regex same as consumer subscription via pattern. We would like
> to
> > provide same behavior what Kafka is basically providing as a source.
>
> We don’t have a regex listTopics at the moment, though we could add this.
> Currently, the regex is done on the client side anyway (although we’d
> really like to change this in the future). So just listing everything and
> filtering locally would be the same performance and behavior as the
> Consumer.
>

I see. Good to know regex is done on the client side - I've just searched
some code and it applies filter for all topics retrieved from metadata
fetch. Then it would be mostly no difference on this. Thanks for confirming.


>
> best,
> Colin
>
> >
> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Thanks for the details Jungtaek!
> > >
> > > I tend to agree with Colin, that using the AdminClient seems to be the
> > > better choice.
> > >
> > > You can get all topics via `listTopics()` (and you can refresh this
> > > information on regular intervals) and match any pattern against the
> list
> > > of available topics in the driver.
> > >
> > > As you use `assignment()` and store offsets in the Spark checkpoint, it
> > > seems that using consumer group management is not a good fit for the
> use
> > > case.
> > >
> > >
> > > Thoughts?
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> > > > Hi,
> > > >
> > > > If there’s no need to consume records in the Spark driver, then the
> > > Consumer is probably the wrong thing to use. Instead, Spark should use
> > > AdminClient to find out what partitions exist and where, manage their
> > > offsets, and so on. There are some KIPs under discussion now that
> would add
> > > the necessary APIs for managing offsets.
> > > >
> > > > Best,
> > > > Colin
> > > >
> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > > >> My feeling is that I didn't explain the use case for Spark properly
> and
> > > >> hence fail to explain the needs. Sorry about this.
> > > >>
> > > >> Spark leverages the single instance of KafkaConsumer in the driver
> > > which is
> > > >> registered solely on the consumer group. This is used in the plan
> phase
> > > for
> > > >> each micro-batch to calculate the overall topicpartitions with its
> > > offset
> > > >> ranges for this batch, and split and assign (topicpartition,
> fromOffset,
> > > >> untilOffset) to each input partition. After the planning is done and
> > > tasks
> > > >> are being distributed to executors, consumer per each input
> partition
> > > will
> > > >> be initialized from some executor (being assigned to the single
> > > >> topicpartition), and pull the actual records. (Pooling consumers is
> > > applied
> > > >> for sure.) As plan phase is to determine the overall
> topicpartitions and
> > > >> offset ranges to process, Spark is never interested on pulling the
> > > records
> > > >> in driver side.
> > > >>
> > > >> Spark mainly leverages poll(0) to get the latest assigned
> partitions and
> > > >> adopt the changes or validate the expectation. That's not only use
> case
> > > for
> > > >> poll(0). Spark is also seeking the offset per topicpartition to the
> > > >> earliest or the latest, or specific one (either provided by end
> user or
> > > the
> > > >> last committed offset) so that Spark can have actual offset or
> validate
> > > the
> > > >> provided offset. According to the javadoc (if I understand
> correctly),
> > > to
> > > >> get the offset immediately it seems to be required to call `poll` or
> > > >> `position`.
> > > >>
> > > >> The way Spark interacts with Kafka in this plan phase in driver is
> > > >> synchronous, as the phase should finish ASAP to run the next phase.
> > > >> Registering ConsumerRebalanceListener and tracking the change will
> > > require
> > > >> some asynchronous handling which sounds to add unnecessary
> complexity.
> > > >> Spark may be OK with deal with synchronous with timeout (that's what
> > > >> methods in KafkaConsumer have been providing - they're not
> > > asynchronous, at
> > > >> least for callers) but dealing with asynchronous is another level of
> > > >> interest. I can see the benefit where continuous thread runs and the
> > > >> consumer is busy with something continuously, relying on listener to
> > > hear
> > > >> the news on reassignment. Unfortunately that's not the case.
> > > >>
> > > >> Unit tests in Spark have similar needs: looks like Kafka test code
> also
> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> > > places
> > > >> as it's appropriate to the place which blocking (+timeout) call is
> > > >> preferred - so I can see the similar needs from here as well.
> > > >>
> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> > > gabor.g.somogyi@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Guys,
> > > >>>
> > > >>> Please see the actual implementation, pretty sure it explains the
> > > situation
> > > >>> well:
> > > >>>
> > > >>>
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > > >>>
> > > >>> To answer one question/assumption which popped up from all of you
> > > Spark not
> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> > > >>> KafkaConsumer#assign as well.
> > > >>> Please see here:
> > > >>>
> > > >>>
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > > >>>
> > > >>> BR,
> > > >>> G
> > > >>>
> > > >>>
> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> > > satish.duggana@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Jungtaek,
> > > >>>> Thanks for the KIP. I have a couple of questions here.
> > > >>>> Is not Spark using Kafka's consumer group management across
> multiple
> > > >>>> consumers?
> > > >>>>
> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
> for a
> > > >>>> pattern based subscription and Spark manually assigns those
> > > >>>> topic-partitions across consumers on workers?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Satish.
> > > >>>>
> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> > > matthias@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> If am not sure if I fully understand yet.
> > > >>>>>
> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
> part of
> > > >>> its
> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> missing
> > > >>>>> something here.
> > > >>>>>
> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> mechanism,
> > > >>>>> that takes care of the assignment of partitions to clients
> within the
> > > >>>>> group. Therefore, I am not sure what you mean by:
> > > >>>>>
> > > >>>>>> which Spark needs to
> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> > > >>>>>
> > > >>>>> Multiple thoughts that may help:
> > > >>>>>
> > > >>>>> - if Spark needs more control about the partition assignment,
> you can
> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > >>>>> configuration)
> > > >>>>>
> > > >>>>> - you may also want to register `ConsumerRebalanceListener` via
> > > >>>>> `subscribe()` to get informed when the group rebalances
> > > >>>>>
> > > >>>>> As you pointed out, using pattern subscription metadata can
> change if
> > > >>>>> topic are added/deleted. However, each metadata change will
> > > triggering
> > > >>> a
> > > >>>>> rebalance and thus you would get corresponding calls to you
> rebalance
> > > >>>>> listener to learn about it and react accordingly.
> > > >>>>>
> > > >>>>> Maybe you can explain why neither of both approaches works and
> what
> > > gap
> > > >>>>> the new API would close?
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about
> Apache
> > > >>>>> Spark,
> > > >>>>>> but this will apply for everything which want to control offset
> of
> > > >>>> Kafka
> > > >>>>>> consumers.
> > > >>>>>>
> > > >>>>>> Spark is managing the committed offsets and the offsets which
> should
> > > >>> be
> > > >>>>>> polled now. Topics and partitions as well. This is required as
> Spark
> > > >>>>> itself
> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
> one of
> > > >>>>>> source/sink (though it's considered as very important).
> > > >>>>>>
> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> topics and
> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> > > >>> Spark
> > > >>>>> can
> > > >>>>>> also provide "patterns" of topics, as well as subscription can
> be
> > > >>>> changed
> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
> Spark
> > > >>> needs
> > > >>>>> to
> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> > > >>>>>>
> > > >>>>>> Looks like assignment() doesn't update the assignment
> information in
> > > >>>>>> consumer. It just returns known one. There's only one known
> approach
> > > >>>>> doing
> > > >>>>>> this, calling `poll`, but Spark is not interested on returned
> > > >>> records,
> > > >>>> so
> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the
> API.
> > > >>> This
> > > >>>>> KIP
> > > >>>>>> proposes to support this as official approach.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabhwan@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well.
> I'm in
> > > >>>>> favor
> > > >>>>>>> of describing it in this discussion thread so the discussion
> itself
> > > >>>> can
> > > >>>>> go
> > > >>>>>>> forward. So copying my answer here:
> > > >>>>>>>
> > > >>>>>>> We have some use case which we don't just rely on everything
> what
> > > >>>> Kafka
> > > >>>>>>> consumer provides. We want to know current assignment on this
> > > >>>> consumer,
> > > >>>>> and
> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> > > >>>>>>>
> > > >>>>>>> That said, we don't want to pull any records here, and if I'm
> not
> > > >>>>> missing
> > > >>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> > > >>>> missing
> > > >>>>>>> something.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > > >>>> matthias@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Thanks for the KIP.
> > > >>>>>>>>
> > > >>>>>>>> Can you elaborate a little bit more on the use case for this
> > > >>> feature?
> > > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > >>>>>>>>> Hi devs,
> > > >>>>>>>>>
> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> > > >>> public
> > > >>>>>>>> method
> > > >>>>>>>>> to only update assignment metadata in consumer.
> > > >>>>>>>>>
> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> doesn't
> > > >>>>>>>> guarantee
> > > >>>>>>>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> > > >>>>>>>> doesn't
> > > >>>>>>>>> have same semantic, so would like to propose new public API
> which
> > > >>>> only
> > > >>>>>>>> does
> > > >>>>>>>>> the desired behavior.
> > > >>>>>>>>>
> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > >>>>>>>>>
> > > >>>>>>>>> Please feel free to suggest any improvements on proposal, as
> I'm
> > > >>> new
> > > >>>>> to
> > > >>>>>>>>> Kafka community and may not catch preferences (like
> > > >>> TimeoutException
> > > >>>>> vs
> > > >>>>>>>>> boolean, etc.) on Kafka project.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks in advance!
> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> Name : Jungtaek Lim
> > > >>>>>>> Blog : http://medium.com/@heartsavior
> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> Name : Jungtaek Lim
> > > >> Blog : http://medium.com/@heartsavior
> > > >> Twitter : http://twitter.com/heartsavior
> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > >>
> > > >
> > >
> > >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Colin McCabe <cm...@apache.org>.
On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> Thanks for the feedbacks Colin and Matthias.
> 
> I agree with you regarding getting topics and partitions via AdminClient,
> just curious how much the overhead would be. Would it be lighter, or
> heavier? We may not want to list topics in regular intervals - in plan
> phase we want to know up-to-date information so that the calculation from
> Spark itself makes sense.

It would be lighter. The consumer will periodically refresh metadata for any topic you are subscribed to. AdminClient doesn’t have the concept of subscriptions, and won’t refresh topic metadata until you request it.

> 
> On the other hands I'm not seeing any information regarding offset in
> current AdminClient, which is also one of reason we leverage consumer and
> call poll(0). Colin, as you mentioned there're KIPs addressing this, could
> you refer KIPs so that we can see whether it would work for our case?
> Without support of this we cannot replace our usage of consumer/poll with
> AdminClient.

KIP-396 is the one for listing offsets in AdminClient.

> 
> ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> receives regex same as consumer subscription via pattern. We would like to
> provide same behavior what Kafka is basically providing as a source.

We don’t have a regex listTopics at the moment, though we could add this. Currently, the regex is done on the client side anyway (although we’d really like to change this in the future). So just listing everything and filtering locally would be the same performance and behavior as the Consumer.

best,
Colin

> 
> On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
> > Thanks for the details Jungtaek!
> >
> > I tend to agree with Colin, that using the AdminClient seems to be the
> > better choice.
> >
> > You can get all topics via `listTopics()` (and you can refresh this
> > information on regular intervals) and match any pattern against the list
> > of available topics in the driver.
> >
> > As you use `assignment()` and store offsets in the Spark checkpoint, it
> > seems that using consumer group management is not a good fit for the use
> > case.
> >
> >
> > Thoughts?
> >
> >
> >
> > -Matthias
> >
> > On 8/12/19 8:22 AM, Colin McCabe wrote:
> > > Hi,
> > >
> > > If there’s no need to consume records in the Spark driver, then the
> > Consumer is probably the wrong thing to use. Instead, Spark should use
> > AdminClient to find out what partitions exist and where, manage their
> > offsets, and so on. There are some KIPs under discussion now that would add
> > the necessary APIs for managing offsets.
> > >
> > > Best,
> > > Colin
> > >
> > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > >> My feeling is that I didn't explain the use case for Spark properly and
> > >> hence fail to explain the needs. Sorry about this.
> > >>
> > >> Spark leverages the single instance of KafkaConsumer in the driver
> > which is
> > >> registered solely on the consumer group. This is used in the plan phase
> > for
> > >> each micro-batch to calculate the overall topicpartitions with its
> > offset
> > >> ranges for this batch, and split and assign (topicpartition, fromOffset,
> > >> untilOffset) to each input partition. After the planning is done and
> > tasks
> > >> are being distributed to executors, consumer per each input partition
> > will
> > >> be initialized from some executor (being assigned to the single
> > >> topicpartition), and pull the actual records. (Pooling consumers is
> > applied
> > >> for sure.) As plan phase is to determine the overall topicpartitions and
> > >> offset ranges to process, Spark is never interested on pulling the
> > records
> > >> in driver side.
> > >>
> > >> Spark mainly leverages poll(0) to get the latest assigned partitions and
> > >> adopt the changes or validate the expectation. That's not only use case
> > for
> > >> poll(0). Spark is also seeking the offset per topicpartition to the
> > >> earliest or the latest, or specific one (either provided by end user or
> > the
> > >> last committed offset) so that Spark can have actual offset or validate
> > the
> > >> provided offset. According to the javadoc (if I understand correctly),
> > to
> > >> get the offset immediately it seems to be required to call `poll` or
> > >> `position`.
> > >>
> > >> The way Spark interacts with Kafka in this plan phase in driver is
> > >> synchronous, as the phase should finish ASAP to run the next phase.
> > >> Registering ConsumerRebalanceListener and tracking the change will
> > require
> > >> some asynchronous handling which sounds to add unnecessary complexity.
> > >> Spark may be OK with deal with synchronous with timeout (that's what
> > >> methods in KafkaConsumer have been providing - they're not
> > asynchronous, at
> > >> least for callers) but dealing with asynchronous is another level of
> > >> interest. I can see the benefit where continuous thread runs and the
> > >> consumer is busy with something continuously, relying on listener to
> > hear
> > >> the news on reassignment. Unfortunately that's not the case.
> > >>
> > >> Unit tests in Spark have similar needs: looks like Kafka test code also
> > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> > places
> > >> as it's appropriate to the place which blocking (+timeout) call is
> > >> preferred - so I can see the similar needs from here as well.
> > >>
> > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> > gabor.g.somogyi@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Guys,
> > >>>
> > >>> Please see the actual implementation, pretty sure it explains the
> > situation
> > >>> well:
> > >>>
> > >>>
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > >>>
> > >>> To answer one question/assumption which popped up from all of you
> > Spark not
> > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> > >>> KafkaConsumer#assign as well.
> > >>> Please see here:
> > >>>
> > >>>
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > >>>
> > >>> BR,
> > >>> G
> > >>>
> > >>>
> > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> > satish.duggana@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Jungtaek,
> > >>>> Thanks for the KIP. I have a couple of questions here.
> > >>>> Is not Spark using Kafka's consumer group management across multiple
> > >>>> consumers?
> > >>>>
> > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > >>>> ConsumerRebalanceListener listener) only to get all the topics for a
> > >>>> pattern based subscription and Spark manually assigns those
> > >>>> topic-partitions across consumers on workers?
> > >>>>
> > >>>> Thanks,
> > >>>> Satish.
> > >>>>
> > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> > matthias@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> If am not sure if I fully understand yet.
> > >>>>>
> > >>>>> The fact, that Spark does not stores offsets in Kafka but as part of
> > >>> its
> > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > >>>>> something here.
> > >>>>>
> > >>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
> > >>>>> that takes care of the assignment of partitions to clients within the
> > >>>>> group. Therefore, I am not sure what you mean by:
> > >>>>>
> > >>>>>> which Spark needs to
> > >>>>>>> know to coordinate multiple consumers to pull correctly.
> > >>>>>
> > >>>>> Multiple thoughts that may help:
> > >>>>>
> > >>>>> - if Spark needs more control about the partition assignment, you can
> > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> > >>>>> configuration)
> > >>>>>
> > >>>>> - you may also want to register `ConsumerRebalanceListener` via
> > >>>>> `subscribe()` to get informed when the group rebalances
> > >>>>>
> > >>>>> As you pointed out, using pattern subscription metadata can change if
> > >>>>> topic are added/deleted. However, each metadata change will
> > triggering
> > >>> a
> > >>>>> rebalance and thus you would get corresponding calls to you rebalance
> > >>>>> listener to learn about it and react accordingly.
> > >>>>>
> > >>>>> Maybe you can explain why neither of both approaches works and what
> > gap
> > >>>>> the new API would close?
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > >>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
> > >>>>> Spark,
> > >>>>>> but this will apply for everything which want to control offset of
> > >>>> Kafka
> > >>>>>> consumers.
> > >>>>>>
> > >>>>>> Spark is managing the committed offsets and the offsets which should
> > >>> be
> > >>>>>> polled now. Topics and partitions as well. This is required as Spark
> > >>>>> itself
> > >>>>>> has its own general checkpoint mechanism and Kafka is just a one of
> > >>>>>> source/sink (though it's considered as very important).
> > >>>>>>
> > >>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
> > >>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> > >>> Spark
> > >>>>> can
> > >>>>>> also provide "patterns" of topics, as well as subscription can be
> > >>>> changed
> > >>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
> > >>> needs
> > >>>>> to
> > >>>>>> know to coordinate multiple consumers to pull correctly.
> > >>>>>>
> > >>>>>> Looks like assignment() doesn't update the assignment information in
> > >>>>>> consumer. It just returns known one. There's only one known approach
> > >>>>> doing
> > >>>>>> this, calling `poll`, but Spark is not interested on returned
> > >>> records,
> > >>>> so
> > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> > >>> This
> > >>>>> KIP
> > >>>>>> proposes to support this as official approach.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
> > >>>>> favor
> > >>>>>>> of describing it in this discussion thread so the discussion itself
> > >>>> can
> > >>>>> go
> > >>>>>>> forward. So copying my answer here:
> > >>>>>>>
> > >>>>>>> We have some use case which we don't just rely on everything what
> > >>>> Kafka
> > >>>>>>> consumer provides. We want to know current assignment on this
> > >>>> consumer,
> > >>>>> and
> > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> > >>>>>>>
> > >>>>>>> That said, we don't want to pull any records here, and if I'm not
> > >>>>> missing
> > >>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> > >>>> missing
> > >>>>>>> something.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Jungtaek Lim (HeartSaVioR)
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > >>>> matthias@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Thanks for the KIP.
> > >>>>>>>>
> > >>>>>>>> Can you elaborate a little bit more on the use case for this
> > >>> feature?
> > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > >>>>>>>>> Hi devs,
> > >>>>>>>>>
> > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> > >>> public
> > >>>>>>>> method
> > >>>>>>>>> to only update assignment metadata in consumer.
> > >>>>>>>>>
> > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > >>>>>>>> guarantee
> > >>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
> > >>>>>>>> doesn't
> > >>>>>>>>> have same semantic, so would like to propose new public API which
> > >>>> only
> > >>>>>>>> does
> > >>>>>>>>> the desired behavior.
> > >>>>>>>>>
> > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > >>>>>>>>>
> > >>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
> > >>> new
> > >>>>> to
> > >>>>>>>>> Kafka community and may not catch preferences (like
> > >>> TimeoutException
> > >>>>> vs
> > >>>>>>>>> boolean, etc.) on Kafka project.
> > >>>>>>>>>
> > >>>>>>>>> Thanks in advance!
> > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Name : Jungtaek Lim
> > >>>>>>> Blog : http://medium.com/@heartsavior
> > >>>>>>> Twitter : http://twitter.com/heartsavior
> > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> Name : Jungtaek Lim
> > >> Blog : http://medium.com/@heartsavior
> > >> Twitter : http://twitter.com/heartsavior
> > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>
> > >
> >
> >
> 
> -- 
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
> 

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
Thanks for the feedbacks Colin and Matthias.

I agree with you regarding getting topics and partitions via AdminClient,
just curious how much the overhead would be. Would it be lighter, or
heavier? We may not want to list topics in regular intervals - in plan
phase we want to know up-to-date information so that the calculation from
Spark itself makes sense.

On the other hands I'm not seeing any information regarding offset in
current AdminClient, which is also one of reason we leverage consumer and
call poll(0). Colin, as you mentioned there're KIPs addressing this, could
you refer KIPs so that we can see whether it would work for our case?
Without support of this we cannot replace our usage of consumer/poll with
AdminClient.

ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
receives regex same as consumer subscription via pattern. We would like to
provide same behavior what Kafka is basically providing as a source.


On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the details Jungtaek!
>
> I tend to agree with Colin, that using the AdminClient seems to be the
> better choice.
>
> You can get all topics via `listTopics()` (and you can refresh this
> information on regular intervals) and match any pattern against the list
> of available topics in the driver.
>
> As you use `assignment()` and store offsets in the Spark checkpoint, it
> seems that using consumer group management is not a good fit for the use
> case.
>
>
> Thoughts?
>
>
>
> -Matthias
>
> On 8/12/19 8:22 AM, Colin McCabe wrote:
> > Hi,
> >
> > If there’s no need to consume records in the Spark driver, then the
> Consumer is probably the wrong thing to use. Instead, Spark should use
> AdminClient to find out what partitions exist and where, manage their
> offsets, and so on. There are some KIPs under discussion now that would add
> the necessary APIs for managing offsets.
> >
> > Best,
> > Colin
> >
> > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> My feeling is that I didn't explain the use case for Spark properly and
> >> hence fail to explain the needs. Sorry about this.
> >>
> >> Spark leverages the single instance of KafkaConsumer in the driver
> which is
> >> registered solely on the consumer group. This is used in the plan phase
> for
> >> each micro-batch to calculate the overall topicpartitions with its
> offset
> >> ranges for this batch, and split and assign (topicpartition, fromOffset,
> >> untilOffset) to each input partition. After the planning is done and
> tasks
> >> are being distributed to executors, consumer per each input partition
> will
> >> be initialized from some executor (being assigned to the single
> >> topicpartition), and pull the actual records. (Pooling consumers is
> applied
> >> for sure.) As plan phase is to determine the overall topicpartitions and
> >> offset ranges to process, Spark is never interested on pulling the
> records
> >> in driver side.
> >>
> >> Spark mainly leverages poll(0) to get the latest assigned partitions and
> >> adopt the changes or validate the expectation. That's not only use case
> for
> >> poll(0). Spark is also seeking the offset per topicpartition to the
> >> earliest or the latest, or specific one (either provided by end user or
> the
> >> last committed offset) so that Spark can have actual offset or validate
> the
> >> provided offset. According to the javadoc (if I understand correctly),
> to
> >> get the offset immediately it seems to be required to call `poll` or
> >> `position`.
> >>
> >> The way Spark interacts with Kafka in this plan phase in driver is
> >> synchronous, as the phase should finish ASAP to run the next phase.
> >> Registering ConsumerRebalanceListener and tracking the change will
> require
> >> some asynchronous handling which sounds to add unnecessary complexity.
> >> Spark may be OK with deal with synchronous with timeout (that's what
> >> methods in KafkaConsumer have been providing - they're not
> asynchronous, at
> >> least for callers) but dealing with asynchronous is another level of
> >> interest. I can see the benefit where continuous thread runs and the
> >> consumer is busy with something continuously, relying on listener to
> hear
> >> the news on reassignment. Unfortunately that's not the case.
> >>
> >> Unit tests in Spark have similar needs: looks like Kafka test code also
> >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> places
> >> as it's appropriate to the place which blocking (+timeout) call is
> >> preferred - so I can see the similar needs from here as well.
> >>
> >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> gabor.g.somogyi@gmail.com>
> >> wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> Please see the actual implementation, pretty sure it explains the
> situation
> >>> well:
> >>>
> >>>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >>>
> >>> To answer one question/assumption which popped up from all of you
> Spark not
> >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >>> KafkaConsumer#assign as well.
> >>> Please see here:
> >>>
> >>>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >>>
> >>> BR,
> >>> G
> >>>
> >>>
> >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> satish.duggana@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Jungtaek,
> >>>> Thanks for the KIP. I have a couple of questions here.
> >>>> Is not Spark using Kafka's consumer group management across multiple
> >>>> consumers?
> >>>>
> >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >>>> ConsumerRebalanceListener listener) only to get all the topics for a
> >>>> pattern based subscription and Spark manually assigns those
> >>>> topic-partitions across consumers on workers?
> >>>>
> >>>> Thanks,
> >>>> Satish.
> >>>>
> >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> If am not sure if I fully understand yet.
> >>>>>
> >>>>> The fact, that Spark does not stores offsets in Kafka but as part of
> >>> its
> >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> >>>>> something here.
> >>>>>
> >>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
> >>>>> that takes care of the assignment of partitions to clients within the
> >>>>> group. Therefore, I am not sure what you mean by:
> >>>>>
> >>>>>> which Spark needs to
> >>>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>
> >>>>> Multiple thoughts that may help:
> >>>>>
> >>>>> - if Spark needs more control about the partition assignment, you can
> >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> >>>>> configuration)
> >>>>>
> >>>>> - you may also want to register `ConsumerRebalanceListener` via
> >>>>> `subscribe()` to get informed when the group rebalances
> >>>>>
> >>>>> As you pointed out, using pattern subscription metadata can change if
> >>>>> topic are added/deleted. However, each metadata change will
> triggering
> >>> a
> >>>>> rebalance and thus you would get corresponding calls to you rebalance
> >>>>> listener to learn about it and react accordingly.
> >>>>>
> >>>>> Maybe you can explain why neither of both approaches works and what
> gap
> >>>>> the new API would close?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
> >>>>> Spark,
> >>>>>> but this will apply for everything which want to control offset of
> >>>> Kafka
> >>>>>> consumers.
> >>>>>>
> >>>>>> Spark is managing the committed offsets and the offsets which should
> >>> be
> >>>>>> polled now. Topics and partitions as well. This is required as Spark
> >>>>> itself
> >>>>>> has its own general checkpoint mechanism and Kafka is just a one of
> >>>>>> source/sink (though it's considered as very important).
> >>>>>>
> >>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
> >>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> >>> Spark
> >>>>> can
> >>>>>> also provide "patterns" of topics, as well as subscription can be
> >>>> changed
> >>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
> >>> needs
> >>>>> to
> >>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>>
> >>>>>> Looks like assignment() doesn't update the assignment information in
> >>>>>> consumer. It just returns known one. There's only one known approach
> >>>>> doing
> >>>>>> this, calling `poll`, but Spark is not interested on returned
> >>> records,
> >>>> so
> >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> >>> This
> >>>>> KIP
> >>>>>> proposes to support this as official approach.
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
> >>>>> favor
> >>>>>>> of describing it in this discussion thread so the discussion itself
> >>>> can
> >>>>> go
> >>>>>>> forward. So copying my answer here:
> >>>>>>>
> >>>>>>> We have some use case which we don't just rely on everything what
> >>>> Kafka
> >>>>>>> consumer provides. We want to know current assignment on this
> >>>> consumer,
> >>>>> and
> >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >>>>>>>
> >>>>>>> That said, we don't want to pull any records here, and if I'm not
> >>>>> missing
> >>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> >>>> missing
> >>>>>>> something.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the KIP.
> >>>>>>>>
> >>>>>>>> Can you elaborate a little bit more on the use case for this
> >>> feature?
> >>>>>>>> Why would a consumer need to update it's metadata explicitly?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >>>>>>>>> Hi devs,
> >>>>>>>>>
> >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> >>> public
> >>>>>>>> method
> >>>>>>>>> to only update assignment metadata in consumer.
> >>>>>>>>>
> >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> >>>>>>>> guarantee
> >>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
> >>>>>>>> doesn't
> >>>>>>>>> have same semantic, so would like to propose new public API which
> >>>> only
> >>>>>>>> does
> >>>>>>>>> the desired behavior.
> >>>>>>>>>
> >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >>>>>>>>>
> >>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
> >>> new
> >>>>> to
> >>>>>>>>> Kafka community and may not catch preferences (like
> >>> TimeoutException
> >>>>> vs
> >>>>>>>>> boolean, etc.) on Kafka project.
> >>>>>>>>>
> >>>>>>>>> Thanks in advance!
> >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Name : Jungtaek Lim
> >>>>>>> Blog : http://medium.com/@heartsavior
> >>>>>>> Twitter : http://twitter.com/heartsavior
> >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the details Jungtaek!

I tend to agree with Colin, that using the AdminClient seems to be the
better choice.

You can get all topics via `listTopics()` (and you can refresh this
information on regular intervals) and match any pattern against the list
of available topics in the driver.

As you use `assignment()` and store offsets in the Spark checkpoint, it
seems that using consumer group management is not a good fit for the use
case.


Thoughts?



-Matthias

On 8/12/19 8:22 AM, Colin McCabe wrote:
> Hi,
> 
> If there’s no need to consume records in the Spark driver, then the Consumer is probably the wrong thing to use. Instead, Spark should use AdminClient to find out what partitions exist and where, manage their offsets, and so on. There are some KIPs under discussion now that would add the necessary APIs for managing offsets.
> 
> Best,
> Colin
> 
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> My feeling is that I didn't explain the use case for Spark properly and
>> hence fail to explain the needs. Sorry about this.
>>
>> Spark leverages the single instance of KafkaConsumer in the driver which is
>> registered solely on the consumer group. This is used in the plan phase for
>> each micro-batch to calculate the overall topicpartitions with its offset
>> ranges for this batch, and split and assign (topicpartition, fromOffset,
>> untilOffset) to each input partition. After the planning is done and tasks
>> are being distributed to executors, consumer per each input partition will
>> be initialized from some executor (being assigned to the single
>> topicpartition), and pull the actual records. (Pooling consumers is applied
>> for sure.) As plan phase is to determine the overall topicpartitions and
>> offset ranges to process, Spark is never interested on pulling the records
>> in driver side.
>>
>> Spark mainly leverages poll(0) to get the latest assigned partitions and
>> adopt the changes or validate the expectation. That's not only use case for
>> poll(0). Spark is also seeking the offset per topicpartition to the
>> earliest or the latest, or specific one (either provided by end user or the
>> last committed offset) so that Spark can have actual offset or validate the
>> provided offset. According to the javadoc (if I understand correctly), to
>> get the offset immediately it seems to be required to call `poll` or
>> `position`.
>>
>> The way Spark interacts with Kafka in this plan phase in driver is
>> synchronous, as the phase should finish ASAP to run the next phase.
>> Registering ConsumerRebalanceListener and tracking the change will require
>> some asynchronous handling which sounds to add unnecessary complexity.
>> Spark may be OK with deal with synchronous with timeout (that's what
>> methods in KafkaConsumer have been providing - they're not asynchronous, at
>> least for callers) but dealing with asynchronous is another level of
>> interest. I can see the benefit where continuous thread runs and the
>> consumer is busy with something continuously, relying on listener to hear
>> the news on reassignment. Unfortunately that's not the case.
>>
>> Unit tests in Spark have similar needs: looks like Kafka test code also
>> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
>> as it's appropriate to the place which blocking (+timeout) call is
>> preferred - so I can see the similar needs from here as well.
>>
>> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <ga...@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> Please see the actual implementation, pretty sure it explains the situation
>>> well:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>>>
>>> To answer one question/assumption which popped up from all of you Spark not
>>> only uses KafkaConsumer#subscribe but pattern subscribe +
>>> KafkaConsumer#assign as well.
>>> Please see here:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jungtaek,
>>>> Thanks for the KIP. I have a couple of questions here.
>>>> Is not Spark using Kafka's consumer group management across multiple
>>>> consumers?
>>>>
>>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>>>> ConsumerRebalanceListener listener) only to get all the topics for a
>>>> pattern based subscription and Spark manually assigns those
>>>> topic-partitions across consumers on workers?
>>>>
>>>> Thanks,
>>>> Satish.
>>>>
>>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>>
>>>>> If am not sure if I fully understand yet.
>>>>>
>>>>> The fact, that Spark does not stores offsets in Kafka but as part of
>>> its
>>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
>>>>> something here.
>>>>>
>>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
>>>>> that takes care of the assignment of partitions to clients within the
>>>>> group. Therefore, I am not sure what you mean by:
>>>>>
>>>>>> which Spark needs to
>>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>
>>>>> Multiple thoughts that may help:
>>>>>
>>>>> - if Spark needs more control about the partition assignment, you can
>>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
>>>>> configuration)
>>>>>
>>>>> - you may also want to register `ConsumerRebalanceListener` via
>>>>> `subscribe()` to get informed when the group rebalances
>>>>>
>>>>> As you pointed out, using pattern subscription metadata can change if
>>>>> topic are added/deleted. However, each metadata change will triggering
>>> a
>>>>> rebalance and thus you would get corresponding calls to you rebalance
>>>>> listener to learn about it and react accordingly.
>>>>>
>>>>> Maybe you can explain why neither of both approaches works and what gap
>>>>> the new API would close?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
>>>>> Spark,
>>>>>> but this will apply for everything which want to control offset of
>>>> Kafka
>>>>>> consumers.
>>>>>>
>>>>>> Spark is managing the committed offsets and the offsets which should
>>> be
>>>>>> polled now. Topics and partitions as well. This is required as Spark
>>>>> itself
>>>>>> has its own general checkpoint mechanism and Kafka is just a one of
>>>>>> source/sink (though it's considered as very important).
>>>>>>
>>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
>>>>>> partitions it wants to subscribe(, and do seek and poll), but as
>>> Spark
>>>>> can
>>>>>> also provide "patterns" of topics, as well as subscription can be
>>>> changed
>>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
>>> needs
>>>>> to
>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>>
>>>>>> Looks like assignment() doesn't update the assignment information in
>>>>>> consumer. It just returns known one. There's only one known approach
>>>>> doing
>>>>>> this, calling `poll`, but Spark is not interested on returned
>>> records,
>>>> so
>>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
>>> This
>>>>> KIP
>>>>>> proposes to support this as official approach.
>>>>>>
>>>>>>
>>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
>>>>> favor
>>>>>>> of describing it in this discussion thread so the discussion itself
>>>> can
>>>>> go
>>>>>>> forward. So copying my answer here:
>>>>>>>
>>>>>>> We have some use case which we don't just rely on everything what
>>>> Kafka
>>>>>>> consumer provides. We want to know current assignment on this
>>>> consumer,
>>>>> and
>>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>>>>>>>
>>>>>>> That said, we don't want to pull any records here, and if I'm not
>>>>> missing
>>>>>>> here, there's no way to accomplish this. Please guide me if I'm
>>>> missing
>>>>>>> something.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP.
>>>>>>>>
>>>>>>>> Can you elaborate a little bit more on the use case for this
>>> feature?
>>>>>>>> Why would a consumer need to update it's metadata explicitly?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>>>>>>>>> Hi devs,
>>>>>>>>>
>>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
>>> public
>>>>>>>> method
>>>>>>>>> to only update assignment metadata in consumer.
>>>>>>>>>
>>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
>>>>>>>> guarantee
>>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
>>>>>>>> doesn't
>>>>>>>>> have same semantic, so would like to propose new public API which
>>>> only
>>>>>>>> does
>>>>>>>>> the desired behavior.
>>>>>>>>>
>>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>>>>>>>>>
>>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
>>> new
>>>>> to
>>>>>>>>> Kafka community and may not catch preferences (like
>>> TimeoutException
>>>>> vs
>>>>>>>>> boolean, etc.) on Kafka project.
>>>>>>>>>
>>>>>>>>> Thanks in advance!
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Name : Jungtaek Lim
>>>>>>> Blog : http://medium.com/@heartsavior
>>>>>>> Twitter : http://twitter.com/heartsavior
>>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> -- 
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
> 


Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Colin McCabe <cm...@apache.org>.
Hi,

If there’s no need to consume records in the Spark driver, then the Consumer is probably the wrong thing to use. Instead, Spark should use AdminClient to find out what partitions exist and where, manage their offsets, and so on. There are some KIPs under discussion now that would add the necessary APIs for managing offsets.

Best,
Colin

On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> My feeling is that I didn't explain the use case for Spark properly and
> hence fail to explain the needs. Sorry about this.
> 
> Spark leverages the single instance of KafkaConsumer in the driver which is
> registered solely on the consumer group. This is used in the plan phase for
> each micro-batch to calculate the overall topicpartitions with its offset
> ranges for this batch, and split and assign (topicpartition, fromOffset,
> untilOffset) to each input partition. After the planning is done and tasks
> are being distributed to executors, consumer per each input partition will
> be initialized from some executor (being assigned to the single
> topicpartition), and pull the actual records. (Pooling consumers is applied
> for sure.) As plan phase is to determine the overall topicpartitions and
> offset ranges to process, Spark is never interested on pulling the records
> in driver side.
> 
> Spark mainly leverages poll(0) to get the latest assigned partitions and
> adopt the changes or validate the expectation. That's not only use case for
> poll(0). Spark is also seeking the offset per topicpartition to the
> earliest or the latest, or specific one (either provided by end user or the
> last committed offset) so that Spark can have actual offset or validate the
> provided offset. According to the javadoc (if I understand correctly), to
> get the offset immediately it seems to be required to call `poll` or
> `position`.
> 
> The way Spark interacts with Kafka in this plan phase in driver is
> synchronous, as the phase should finish ASAP to run the next phase.
> Registering ConsumerRebalanceListener and tracking the change will require
> some asynchronous handling which sounds to add unnecessary complexity.
> Spark may be OK with deal with synchronous with timeout (that's what
> methods in KafkaConsumer have been providing - they're not asynchronous, at
> least for callers) but dealing with asynchronous is another level of
> interest. I can see the benefit where continuous thread runs and the
> consumer is busy with something continuously, relying on listener to hear
> the news on reassignment. Unfortunately that's not the case.
> 
> Unit tests in Spark have similar needs: looks like Kafka test code also
> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
> as it's appropriate to the place which blocking (+timeout) call is
> preferred - so I can see the similar needs from here as well.
> 
> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <ga...@gmail.com>
> wrote:
> 
> > Hi Guys,
> >
> > Please see the actual implementation, pretty sure it explains the situation
> > well:
> >
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >
> > To answer one question/assumption which popped up from all of you Spark not
> > only uses KafkaConsumer#subscribe but pattern subscribe +
> > KafkaConsumer#assign as well.
> > Please see here:
> >
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >
> > BR,
> > G
> >
> >
> > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <sa...@gmail.com>
> > wrote:
> >
> > > Hi Jungtaek,
> > > Thanks for the KIP. I have a couple of questions here.
> > > Is not Spark using Kafka's consumer group management across multiple
> > > consumers?
> > >
> > > Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > > ConsumerRebalanceListener listener) only to get all the topics for a
> > > pattern based subscription and Spark manually assigns those
> > > topic-partitions across consumers on workers?
> > >
> > > Thanks,
> > > Satish.
> > >
> > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > >
> > > > If am not sure if I fully understand yet.
> > > >
> > > > The fact, that Spark does not stores offsets in Kafka but as part of
> > its
> > > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > > > something here.
> > > >
> > > > As you are using subscribe(), you use Kafka consumer group mechanism,
> > > > that takes care of the assignment of partitions to clients within the
> > > > group. Therefore, I am not sure what you mean by:
> > > >
> > > > > which Spark needs to
> > > > >> know to coordinate multiple consumers to pull correctly.
> > > >
> > > > Multiple thoughts that may help:
> > > >
> > > > - if Spark needs more control about the partition assignment, you can
> > > > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > > configuration)
> > > >
> > > > - you may also want to register `ConsumerRebalanceListener` via
> > > > `subscribe()` to get informed when the group rebalances
> > > >
> > > > As you pointed out, using pattern subscription metadata can change if
> > > > topic are added/deleted. However, each metadata change will triggering
> > a
> > > > rebalance and thus you would get corresponding calls to you rebalance
> > > > listener to learn about it and react accordingly.
> > > >
> > > > Maybe you can explain why neither of both approaches works and what gap
> > > > the new API would close?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > > > Let me elaborate my explanation a bit more. Here we say about Apache
> > > > Spark,
> > > > > but this will apply for everything which want to control offset of
> > > Kafka
> > > > > consumers.
> > > > >
> > > > > Spark is managing the committed offsets and the offsets which should
> > be
> > > > > polled now. Topics and partitions as well. This is required as Spark
> > > > itself
> > > > > has its own general checkpoint mechanism and Kafka is just a one of
> > > > > source/sink (though it's considered as very important).
> > > > >
> > > > > To pull records from Kafka, Spark provides to Kafka which topics and
> > > > > partitions it wants to subscribe(, and do seek and poll), but as
> > Spark
> > > > can
> > > > > also provide "patterns" of topics, as well as subscription can be
> > > changed
> > > > > in Kafka side (topic added/dropped, partitions added) which Spark
> > needs
> > > > to
> > > > > know to coordinate multiple consumers to pull correctly.
> > > > >
> > > > > Looks like assignment() doesn't update the assignment information in
> > > > > consumer. It just returns known one. There's only one known approach
> > > > doing
> > > > > this, calling `poll`, but Spark is not interested on returned
> > records,
> > > so
> > > > > there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> > This
> > > > KIP
> > > > > proposes to support this as official approach.
> > > > >
> > > > >
> > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Sorry I didn't recognize you're also asking it here as well. I'm in
> > > > favor
> > > > >> of describing it in this discussion thread so the discussion itself
> > > can
> > > > go
> > > > >> forward. So copying my answer here:
> > > > >>
> > > > >> We have some use case which we don't just rely on everything what
> > > Kafka
> > > > >> consumer provides. We want to know current assignment on this
> > > consumer,
> > > > and
> > > > >> to get the latest assignment, we called the hack `poll(0)`.
> > > > >>
> > > > >> That said, we don't want to pull any records here, and if I'm not
> > > > missing
> > > > >> here, there's no way to accomplish this. Please guide me if I'm
> > > missing
> > > > >> something.
> > > > >>
> > > > >> Thanks,
> > > > >> Jungtaek Lim (HeartSaVioR)
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > > matthias@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >>> Thanks for the KIP.
> > > > >>>
> > > > >>> Can you elaborate a little bit more on the use case for this
> > feature?
> > > > >>> Why would a consumer need to update it's metadata explicitly?
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > > >>>> Hi devs,
> > > > >>>>
> > > > >>>> I'd like to initiate discussion around KIP-505, exposing new
> > public
> > > > >>> method
> > > > >>>> to only update assignment metadata in consumer.
> > > > >>>>
> > > > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > > > >>> guarantee
> > > > >>>> that it doesn't pull any records, and new method `poll(Duration)`
> > > > >>> doesn't
> > > > >>>> have same semantic, so would like to propose new public API which
> > > only
> > > > >>> does
> > > > >>>> the desired behavior.
> > > > >>>>
> > > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > > >>>>
> > > > >>>> Please feel free to suggest any improvements on proposal, as I'm
> > new
> > > > to
> > > > >>>> Kafka community and may not catch preferences (like
> > TimeoutException
> > > > vs
> > > > >>>> boolean, etc.) on Kafka project.
> > > > >>>>
> > > > >>>> Thanks in advance!
> > > > >>>> Jungtaek Lim (HeartSaVioR)
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >> --
> > > > >> Name : Jungtaek Lim
> > > > >> Blog : http://medium.com/@heartsavior
> > > > >> Twitter : http://twitter.com/heartsavior
> > > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > > >>
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> 
> 
> -- 
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
> 

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
My feeling is that I didn't explain the use case for Spark properly and
hence fail to explain the needs. Sorry about this.

Spark leverages the single instance of KafkaConsumer in the driver which is
registered solely on the consumer group. This is used in the plan phase for
each micro-batch to calculate the overall topicpartitions with its offset
ranges for this batch, and split and assign (topicpartition, fromOffset,
untilOffset) to each input partition. After the planning is done and tasks
are being distributed to executors, consumer per each input partition will
be initialized from some executor (being assigned to the single
topicpartition), and pull the actual records. (Pooling consumers is applied
for sure.) As plan phase is to determine the overall topicpartitions and
offset ranges to process, Spark is never interested on pulling the records
in driver side.

Spark mainly leverages poll(0) to get the latest assigned partitions and
adopt the changes or validate the expectation. That's not only use case for
poll(0). Spark is also seeking the offset per topicpartition to the
earliest or the latest, or specific one (either provided by end user or the
last committed offset) so that Spark can have actual offset or validate the
provided offset. According to the javadoc (if I understand correctly), to
get the offset immediately it seems to be required to call `poll` or
`position`.

The way Spark interacts with Kafka in this plan phase in driver is
synchronous, as the phase should finish ASAP to run the next phase.
Registering ConsumerRebalanceListener and tracking the change will require
some asynchronous handling which sounds to add unnecessary complexity.
Spark may be OK with deal with synchronous with timeout (that's what
methods in KafkaConsumer have been providing - they're not asynchronous, at
least for callers) but dealing with asynchronous is another level of
interest. I can see the benefit where continuous thread runs and the
consumer is busy with something continuously, relying on listener to hear
the news on reassignment. Unfortunately that's not the case.

Unit tests in Spark have similar needs: looks like Kafka test code also
leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
as it's appropriate to the place which blocking (+timeout) call is
preferred - so I can see the similar needs from here as well.

On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <ga...@gmail.com>
wrote:

> Hi Guys,
>
> Please see the actual implementation, pretty sure it explains the situation
> well:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>
> To answer one question/assumption which popped up from all of you Spark not
> only uses KafkaConsumer#subscribe but pattern subscribe +
> KafkaConsumer#assign as well.
> Please see here:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>
> BR,
> G
>
>
> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <sa...@gmail.com>
> wrote:
>
> > Hi Jungtaek,
> > Thanks for the KIP. I have a couple of questions here.
> > Is not Spark using Kafka's consumer group management across multiple
> > consumers?
> >
> > Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > ConsumerRebalanceListener listener) only to get all the topics for a
> > pattern based subscription and Spark manually assigns those
> > topic-partitions across consumers on workers?
> >
> > Thanks,
> > Satish.
> >
> > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > If am not sure if I fully understand yet.
> > >
> > > The fact, that Spark does not stores offsets in Kafka but as part of
> its
> > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > > something here.
> > >
> > > As you are using subscribe(), you use Kafka consumer group mechanism,
> > > that takes care of the assignment of partitions to clients within the
> > > group. Therefore, I am not sure what you mean by:
> > >
> > > > which Spark needs to
> > > >> know to coordinate multiple consumers to pull correctly.
> > >
> > > Multiple thoughts that may help:
> > >
> > >  - if Spark needs more control about the partition assignment, you can
> > > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > configuration)
> > >
> > >  - you may also want to register `ConsumerRebalanceListener` via
> > > `subscribe()` to get informed when the group rebalances
> > >
> > > As you pointed out, using pattern subscription metadata can change if
> > > topic are added/deleted. However, each metadata change will triggering
> a
> > > rebalance and thus you would get corresponding calls to you rebalance
> > > listener to learn about it and react accordingly.
> > >
> > > Maybe you can explain why neither of both approaches works and what gap
> > > the new API would close?
> > >
> > >
> > > -Matthias
> > >
> > > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > > Let me elaborate my explanation a bit more. Here we say about Apache
> > > Spark,
> > > > but this will apply for everything which want to control offset of
> > Kafka
> > > > consumers.
> > > >
> > > > Spark is managing the committed offsets and the offsets which should
> be
> > > > polled now. Topics and partitions as well. This is required as Spark
> > > itself
> > > > has its own general checkpoint mechanism and Kafka is just a one of
> > > > source/sink (though it's considered as very important).
> > > >
> > > > To pull records from Kafka, Spark provides to Kafka which topics and
> > > > partitions it wants to subscribe(, and do seek and poll), but as
> Spark
> > > can
> > > > also provide "patterns" of topics, as well as subscription can be
> > changed
> > > > in Kafka side (topic added/dropped, partitions added) which Spark
> needs
> > > to
> > > > know to coordinate multiple consumers to pull correctly.
> > > >
> > > > Looks like assignment() doesn't update the assignment information in
> > > > consumer. It just returns known one. There's only one known approach
> > > doing
> > > > this, calling `poll`, but Spark is not interested on returned
> records,
> > so
> > > > there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> This
> > > KIP
> > > > proposes to support this as official approach.
> > > >
> > > >
> > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> > wrote:
> > > >
> > > >> Sorry I didn't recognize you're also asking it here as well. I'm in
> > > favor
> > > >> of describing it in this discussion thread so the discussion itself
> > can
> > > go
> > > >> forward. So copying my answer here:
> > > >>
> > > >> We have some use case which we don't just rely on everything what
> > Kafka
> > > >> consumer provides. We want to know current assignment on this
> > consumer,
> > > and
> > > >> to get the latest assignment, we called the hack `poll(0)`.
> > > >>
> > > >> That said, we don't want to pull any records here, and if I'm not
> > > missing
> > > >> here, there's no way to accomplish this. Please guide me if I'm
> > missing
> > > >> something.
> > > >>
> > > >> Thanks,
> > > >> Jungtaek Lim (HeartSaVioR)
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > matthias@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Thanks for the KIP.
> > > >>>
> > > >>> Can you elaborate a little bit more on the use case for this
> feature?
> > > >>> Why would a consumer need to update it's metadata explicitly?
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > >>>> Hi devs,
> > > >>>>
> > > >>>> I'd like to initiate discussion around KIP-505, exposing new
> public
> > > >>> method
> > > >>>> to only update assignment metadata in consumer.
> > > >>>>
> > > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > > >>> guarantee
> > > >>>> that it doesn't pull any records, and new method `poll(Duration)`
> > > >>> doesn't
> > > >>>> have same semantic, so would like to propose new public API which
> > only
> > > >>> does
> > > >>>> the desired behavior.
> > > >>>>
> > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > >>>>
> > > >>>> Please feel free to suggest any improvements on proposal, as I'm
> new
> > > to
> > > >>>> Kafka community and may not catch preferences (like
> TimeoutException
> > > vs
> > > >>>> boolean, etc.) on Kafka project.
> > > >>>>
> > > >>>> Thanks in advance!
> > > >>>> Jungtaek Lim (HeartSaVioR)
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >> --
> > > >> Name : Jungtaek Lim
> > > >> Blog : http://medium.com/@heartsavior
> > > >> Twitter : http://twitter.com/heartsavior
> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > >>
> > > >
> > > >
> > >
> > >
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Guys,

Please see the actual implementation, pretty sure it explains the situation
well:
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

To answer one question/assumption which popped up from all of you Spark not
only uses KafkaConsumer#subscribe but pattern subscribe +
KafkaConsumer#assign as well.
Please see here:
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala

BR,
G


On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <sa...@gmail.com>
wrote:

> Hi Jungtaek,
> Thanks for the KIP. I have a couple of questions here.
> Is not Spark using Kafka's consumer group management across multiple
> consumers?
>
> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> ConsumerRebalanceListener listener) only to get all the topics for a
> pattern based subscription and Spark manually assigns those
> topic-partitions across consumers on workers?
>
> Thanks,
> Satish.
>
> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > If am not sure if I fully understand yet.
> >
> > The fact, that Spark does not stores offsets in Kafka but as part of its
> > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > something here.
> >
> > As you are using subscribe(), you use Kafka consumer group mechanism,
> > that takes care of the assignment of partitions to clients within the
> > group. Therefore, I am not sure what you mean by:
> >
> > > which Spark needs to
> > >> know to coordinate multiple consumers to pull correctly.
> >
> > Multiple thoughts that may help:
> >
> >  - if Spark needs more control about the partition assignment, you can
> > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > configuration)
> >
> >  - you may also want to register `ConsumerRebalanceListener` via
> > `subscribe()` to get informed when the group rebalances
> >
> > As you pointed out, using pattern subscription metadata can change if
> > topic are added/deleted. However, each metadata change will triggering a
> > rebalance and thus you would get corresponding calls to you rebalance
> > listener to learn about it and react accordingly.
> >
> > Maybe you can explain why neither of both approaches works and what gap
> > the new API would close?
> >
> >
> > -Matthias
> >
> > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > Let me elaborate my explanation a bit more. Here we say about Apache
> > Spark,
> > > but this will apply for everything which want to control offset of
> Kafka
> > > consumers.
> > >
> > > Spark is managing the committed offsets and the offsets which should be
> > > polled now. Topics and partitions as well. This is required as Spark
> > itself
> > > has its own general checkpoint mechanism and Kafka is just a one of
> > > source/sink (though it's considered as very important).
> > >
> > > To pull records from Kafka, Spark provides to Kafka which topics and
> > > partitions it wants to subscribe(, and do seek and poll), but as Spark
> > can
> > > also provide "patterns" of topics, as well as subscription can be
> changed
> > > in Kafka side (topic added/dropped, partitions added) which Spark needs
> > to
> > > know to coordinate multiple consumers to pull correctly.
> > >
> > > Looks like assignment() doesn't update the assignment information in
> > > consumer. It just returns known one. There's only one known approach
> > doing
> > > this, calling `poll`, but Spark is not interested on returned records,
> so
> > > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This
> > KIP
> > > proposes to support this as official approach.
> > >
> > >
> > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com>
> wrote:
> > >
> > >> Sorry I didn't recognize you're also asking it here as well. I'm in
> > favor
> > >> of describing it in this discussion thread so the discussion itself
> can
> > go
> > >> forward. So copying my answer here:
> > >>
> > >> We have some use case which we don't just rely on everything what
> Kafka
> > >> consumer provides. We want to know current assignment on this
> consumer,
> > and
> > >> to get the latest assignment, we called the hack `poll(0)`.
> > >>
> > >> That said, we don't want to pull any records here, and if I'm not
> > missing
> > >> here, there's no way to accomplish this. Please guide me if I'm
> missing
> > >> something.
> > >>
> > >> Thanks,
> > >> Jungtaek Lim (HeartSaVioR)
> > >>
> > >>
> > >>
> > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> matthias@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for the KIP.
> > >>>
> > >>> Can you elaborate a little bit more on the use case for this feature?
> > >>> Why would a consumer need to update it's metadata explicitly?
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > >>>> Hi devs,
> > >>>>
> > >>>> I'd like to initiate discussion around KIP-505, exposing new public
> > >>> method
> > >>>> to only update assignment metadata in consumer.
> > >>>>
> > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > >>> guarantee
> > >>>> that it doesn't pull any records, and new method `poll(Duration)`
> > >>> doesn't
> > >>>> have same semantic, so would like to propose new public API which
> only
> > >>> does
> > >>>> the desired behavior.
> > >>>>
> > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > >>>>
> > >>>> Please feel free to suggest any improvements on proposal, as I'm new
> > to
> > >>>> Kafka community and may not catch preferences (like TimeoutException
> > vs
> > >>>> boolean, etc.) on Kafka project.
> > >>>>
> > >>>> Thanks in advance!
> > >>>> Jungtaek Lim (HeartSaVioR)
> > >>>>
> > >>>
> > >>>
> > >>
> > >> --
> > >> Name : Jungtaek Lim
> > >> Blog : http://medium.com/@heartsavior
> > >> Twitter : http://twitter.com/heartsavior
> > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>
> > >
> > >
> >
> >
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Satish Duggana <sa...@gmail.com>.
Hi Jungtaek,
Thanks for the KIP. I have a couple of questions here.
Is not Spark using Kafka's consumer group management across multiple
consumers?

Is Spark using KafkaConsumer#subscribe(Pattern pattern,
ConsumerRebalanceListener listener) only to get all the topics for a
pattern based subscription and Spark manually assigns those
topic-partitions across consumers on workers?

Thanks,
Satish.

On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> If am not sure if I fully understand yet.
>
> The fact, that Spark does not stores offsets in Kafka but as part of its
> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> something here.
>
> As you are using subscribe(), you use Kafka consumer group mechanism,
> that takes care of the assignment of partitions to clients within the
> group. Therefore, I am not sure what you mean by:
>
> > which Spark needs to
> >> know to coordinate multiple consumers to pull correctly.
>
> Multiple thoughts that may help:
>
>  - if Spark needs more control about the partition assignment, you can
> provide a custom `ConsumerPartitionAssignor` (via the consumer
> configuration)
>
>  - you may also want to register `ConsumerRebalanceListener` via
> `subscribe()` to get informed when the group rebalances
>
> As you pointed out, using pattern subscription metadata can change if
> topic are added/deleted. However, each metadata change will triggering a
> rebalance and thus you would get corresponding calls to you rebalance
> listener to learn about it and react accordingly.
>
> Maybe you can explain why neither of both approaches works and what gap
> the new API would close?
>
>
> -Matthias
>
> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > Let me elaborate my explanation a bit more. Here we say about Apache
> Spark,
> > but this will apply for everything which want to control offset of Kafka
> > consumers.
> >
> > Spark is managing the committed offsets and the offsets which should be
> > polled now. Topics and partitions as well. This is required as Spark
> itself
> > has its own general checkpoint mechanism and Kafka is just a one of
> > source/sink (though it's considered as very important).
> >
> > To pull records from Kafka, Spark provides to Kafka which topics and
> > partitions it wants to subscribe(, and do seek and poll), but as Spark
> can
> > also provide "patterns" of topics, as well as subscription can be changed
> > in Kafka side (topic added/dropped, partitions added) which Spark needs
> to
> > know to coordinate multiple consumers to pull correctly.
> >
> > Looks like assignment() doesn't update the assignment information in
> > consumer. It just returns known one. There's only one known approach
> doing
> > this, calling `poll`, but Spark is not interested on returned records, so
> > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This
> KIP
> > proposes to support this as official approach.
> >
> >
> > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com> wrote:
> >
> >> Sorry I didn't recognize you're also asking it here as well. I'm in
> favor
> >> of describing it in this discussion thread so the discussion itself can
> go
> >> forward. So copying my answer here:
> >>
> >> We have some use case which we don't just rely on everything what Kafka
> >> consumer provides. We want to know current assignment on this consumer,
> and
> >> to get the latest assignment, we called the hack `poll(0)`.
> >>
> >> That said, we don't want to pull any records here, and if I'm not
> missing
> >> here, there's no way to accomplish this. Please guide me if I'm missing
> >> something.
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >>
> >>
> >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for the KIP.
> >>>
> >>> Can you elaborate a little bit more on the use case for this feature?
> >>> Why would a consumer need to update it's metadata explicitly?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >>>> Hi devs,
> >>>>
> >>>> I'd like to initiate discussion around KIP-505, exposing new public
> >>> method
> >>>> to only update assignment metadata in consumer.
> >>>>
> >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> >>> guarantee
> >>>> that it doesn't pull any records, and new method `poll(Duration)`
> >>> doesn't
> >>>> have same semantic, so would like to propose new public API which only
> >>> does
> >>>> the desired behavior.
> >>>>
> >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >>>>
> >>>> Please feel free to suggest any improvements on proposal, as I'm new
> to
> >>>> Kafka community and may not catch preferences (like TimeoutException
> vs
> >>>> boolean, etc.) on Kafka project.
> >>>>
> >>>> Thanks in advance!
> >>>> Jungtaek Lim (HeartSaVioR)
> >>>>
> >>>
> >>>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
> >
>
>

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
If am not sure if I fully understand yet.

The fact, that Spark does not stores offsets in Kafka but as part of its
own checkpoint mechanism seems to be orthogonal. Maybe I am missing
something here.

As you are using subscribe(), you use Kafka consumer group mechanism,
that takes care of the assignment of partitions to clients within the
group. Therefore, I am not sure what you mean by:

> which Spark needs to
>> know to coordinate multiple consumers to pull correctly.

Multiple thoughts that may help:

 - if Spark needs more control about the partition assignment, you can
provide a custom `ConsumerPartitionAssignor` (via the consumer
configuration)

 - you may also want to register `ConsumerRebalanceListener` via
`subscribe()` to get informed when the group rebalances

As you pointed out, using pattern subscription metadata can change if
topic are added/deleted. However, each metadata change will triggering a
rebalance and thus you would get corresponding calls to you rebalance
listener to learn about it and react accordingly.

Maybe you can explain why neither of both approaches works and what gap
the new API would close?


-Matthias

On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> Let me elaborate my explanation a bit more. Here we say about Apache Spark,
> but this will apply for everything which want to control offset of Kafka
> consumers.
> 
> Spark is managing the committed offsets and the offsets which should be
> polled now. Topics and partitions as well. This is required as Spark itself
> has its own general checkpoint mechanism and Kafka is just a one of
> source/sink (though it's considered as very important).
> 
> To pull records from Kafka, Spark provides to Kafka which topics and
> partitions it wants to subscribe(, and do seek and poll), but as Spark can
> also provide "patterns" of topics, as well as subscription can be changed
> in Kafka side (topic added/dropped, partitions added) which Spark needs to
> know to coordinate multiple consumers to pull correctly.
> 
> Looks like assignment() doesn't update the assignment information in
> consumer. It just returns known one. There's only one known approach doing
> this, calling `poll`, but Spark is not interested on returned records, so
> there's a need for a hack `poll(0)`, and Kafka deprecated the API. This KIP
> proposes to support this as official approach.
> 
> 
> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com> wrote:
> 
>> Sorry I didn't recognize you're also asking it here as well. I'm in favor
>> of describing it in this discussion thread so the discussion itself can go
>> forward. So copying my answer here:
>>
>> We have some use case which we don't just rely on everything what Kafka
>> consumer provides. We want to know current assignment on this consumer, and
>> to get the latest assignment, we called the hack `poll(0)`.
>>
>> That said, we don't want to pull any records here, and if I'm not missing
>> here, there's no way to accomplish this. Please guide me if I'm missing
>> something.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>>
>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Thanks for the KIP.
>>>
>>> Can you elaborate a little bit more on the use case for this feature?
>>> Why would a consumer need to update it's metadata explicitly?
>>>
>>>
>>> -Matthias
>>>
>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>>>> Hi devs,
>>>>
>>>> I'd like to initiate discussion around KIP-505, exposing new public
>>> method
>>>> to only update assignment metadata in consumer.
>>>>
>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
>>> guarantee
>>>> that it doesn't pull any records, and new method `poll(Duration)`
>>> doesn't
>>>> have same semantic, so would like to propose new public API which only
>>> does
>>>> the desired behavior.
>>>>
>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>>>>
>>>> Please feel free to suggest any improvements on proposal, as I'm new to
>>>> Kafka community and may not catch preferences (like TimeoutException vs
>>>> boolean, etc.) on Kafka project.
>>>>
>>>> Thanks in advance!
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>
>>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
> 
> 


Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
Let me elaborate my explanation a bit more. Here we say about Apache Spark,
but this will apply for everything which want to control offset of Kafka
consumers.

Spark is managing the committed offsets and the offsets which should be
polled now. Topics and partitions as well. This is required as Spark itself
has its own general checkpoint mechanism and Kafka is just a one of
source/sink (though it's considered as very important).

To pull records from Kafka, Spark provides to Kafka which topics and
partitions it wants to subscribe(, and do seek and poll), but as Spark can
also provide "patterns" of topics, as well as subscription can be changed
in Kafka side (topic added/dropped, partitions added) which Spark needs to
know to coordinate multiple consumers to pull correctly.

Looks like assignment() doesn't update the assignment information in
consumer. It just returns known one. There's only one known approach doing
this, calling `poll`, but Spark is not interested on returned records, so
there's a need for a hack `poll(0)`, and Kafka deprecated the API. This KIP
proposes to support this as official approach.


On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <ka...@gmail.com> wrote:

> Sorry I didn't recognize you're also asking it here as well. I'm in favor
> of describing it in this discussion thread so the discussion itself can go
> forward. So copying my answer here:
>
> We have some use case which we don't just rely on everything what Kafka
> consumer provides. We want to know current assignment on this consumer, and
> to get the latest assignment, we called the hack `poll(0)`.
>
> That said, we don't want to pull any records here, and if I'm not missing
> here, there's no way to accomplish this. Please guide me if I'm missing
> something.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Thanks for the KIP.
>>
>> Can you elaborate a little bit more on the use case for this feature?
>> Why would a consumer need to update it's metadata explicitly?
>>
>>
>> -Matthias
>>
>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>> > Hi devs,
>> >
>> > I'd like to initiate discussion around KIP-505, exposing new public
>> method
>> > to only update assignment metadata in consumer.
>> >
>> > `poll(0)` has been misused as according to Kafka doc it doesn't
>> guarantee
>> > that it doesn't pull any records, and new method `poll(Duration)`
>> doesn't
>> > have same semantic, so would like to propose new public API which only
>> does
>> > the desired behavior.
>> >
>> > KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>> >
>> > Please feel free to suggest any improvements on proposal, as I'm new to
>> > Kafka community and may not catch preferences (like TimeoutException vs
>> > boolean, etc.) on Kafka project.
>> >
>> > Thanks in advance!
>> > Jungtaek Lim (HeartSaVioR)
>> >
>>
>>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by Jungtaek Lim <ka...@gmail.com>.
Sorry I didn't recognize you're also asking it here as well. I'm in favor
of describing it in this discussion thread so the discussion itself can go
forward. So copying my answer here:

We have some use case which we don't just rely on everything what Kafka
consumer provides. We want to know current assignment on this consumer, and
to get the latest assignment, we called the hack `poll(0)`.

That said, we don't want to pull any records here, and if I'm not missing
here, there's no way to accomplish this. Please guide me if I'm missing
something.

Thanks,
Jungtaek Lim (HeartSaVioR)



On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the KIP.
>
> Can you elaborate a little bit more on the use case for this feature?
> Why would a consumer need to update it's metadata explicitly?
>
>
> -Matthias
>
> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > Hi devs,
> >
> > I'd like to initiate discussion around KIP-505, exposing new public
> method
> > to only update assignment metadata in consumer.
> >
> > `poll(0)` has been misused as according to Kafka doc it doesn't guarantee
> > that it doesn't pull any records, and new method `poll(Duration)` doesn't
> > have same semantic, so would like to propose new public API which only
> does
> > the desired behavior.
> >
> > KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >
> > Please feel free to suggest any improvements on proposal, as I'm new to
> > Kafka community and may not catch preferences (like TimeoutException vs
> > boolean, etc.) on Kafka project.
> >
> > Thanks in advance!
> > Jungtaek Lim (HeartSaVioR)
> >
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: [DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the KIP.

Can you elaborate a little bit more on the use case for this feature?
Why would a consumer need to update it's metadata explicitly?


-Matthias

On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> Hi devs,
> 
> I'd like to initiate discussion around KIP-505, exposing new public method
> to only update assignment metadata in consumer.
> 
> `poll(0)` has been misused as according to Kafka doc it doesn't guarantee
> that it doesn't pull any records, and new method `poll(Duration)` doesn't
> have same semantic, so would like to propose new public API which only does
> the desired behavior.
> 
> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> 
> Please feel free to suggest any improvements on proposal, as I'm new to
> Kafka community and may not catch preferences (like TimeoutException vs
> boolean, etc.) on Kafka project.
> 
> Thanks in advance!
> Jungtaek Lim (HeartSaVioR)
>