You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2017/11/03 23:49:52 UTC

[DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Hello folks,

I have filed a new KIP on adding AdminClient into Streams for internal
topic management.

Looking for feedback on

*https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*

-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Matt Farmer <ma...@frmr.me>.
This seems like an A+ improvement to me.

On Fri, Nov 3, 2017 at 7:49 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello folks,
>
> I have filed a new KIP on adding AdminClient into Streams for internal
> topic management.
>
> Looking for feedback on
>
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> >*
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Ted, has updated the KIP.

On Fri, Nov 3, 2017 at 8:01 PM, Ted Yu <yu...@gmail.com> wrote:

> Looks good overall.
>
> bq. the creation within StreamsPartitionAssignor
>
> Typo above: should be StreamPartitionAssignor
>
> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello folks,
> >
> > I have filed a new KIP on adding AdminClient into Streams for internal
> > topic management.
> >
> > Looking for feedback on
> >
> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
Yes, I will update the KIP accordingly. Thanks.

On Tue, Nov 14, 2017 at 2:56 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> One more thing. Can you update the KIP accordingly. It still says:
>
>  - Compatibility check: we will use a network client for this purpose,
> as it is a one-time thing.
>
> Additionally, I think we should add a "admin." prefix that allows to set
> certain config parameters for the admin client only. Similar to
> producer/consumer.
>
> Can we add this to the KIP?
>
>
> -Matthias
>
> On 11/14/17 2:51 PM, Matthias J. Sax wrote:
> > Thanks for looking into this into details!
> >
> > As mentioned, I would like to keep the check, but if it's too much
> > overhead, I agree that it's not worth it.
> >
> > Thanks.
> >
> > -Matthias
> >
> > On 11/14/17 10:00 AM, Guozhang Wang wrote:
> >> I looked into how to use a NetworkClient to replace StreamsKafkaClient
> to
> >> do this one-time checking, and the complexity is actually pretty high:
> >> since it is a barebone NetworkClient, we have to handle the connection /
> >> readiness / find a broker to send to / etc logic plus introducing all
> these
> >> dependencies into KafkaStreams class. So I have decided to not do this
> in
> >> this KIP. If people feel strongly about this let's discuss more.
> >>
> >>
> >> I'll start the voting process on the mailing list now.
> >>
> >>
> >> Guozhang
> >>
> >> On Fri, Nov 10, 2017 at 11:47 AM, Bill Bejeck <bb...@gmail.com>
> wrote:
> >>
> >>> I'm leaning towards option 3, although option 2 is a reasonable
> tradeoff
> >>> between the two.
> >>>
> >>> Overall I'm leaning towards option 3 because:
> >>>
> >>>    1. As Guozhang has said we are failing "fast enough" with an
> Exception
> >>>    from the first rebalance.
> >>>    2. Less complexity/maintenance cost by not having a transient
> network
> >>>    client
> >>>    3. Ideally, this check should be on the AdminClient itself but
> adding
> >>>    such a check creates "scope creep" for this KIP.
> >>>
> >>> IMHO the combination of these reasons makes option 3 my preferred
> approach.
> >>>
> >>> Thanks,
> >>> Bill
> >>>
> >>> On Tue, Nov 7, 2017 at 12:48 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>
> >>>> Here is what I'm thinking about this trade-off: we want to fail fast
> when
> >>>> brokers do not yet support the requested API version, with the cost
> that
> >>> we
> >>>> need to do this one-time thing with an expensed NetworkClient plus a
> bit
> >>>> longer starting up latency. Here are a few different options:
> >>>>
> >>>> 1) we ask all the brokers: this is one extreme of the trade-off, we
> still
> >>>> need to handle UnsupportedApiVersionsException since brokers can
> >>>> downgrade.
> >>>> 2) we ask a random broker: this is what we did, and a bit weaker than
> 1)
> >>>> but saves on latency.
> >>>> 3) we do not ask anyone: this is the other extreme of the trade-off.
> >>>>
> >>>> To me I think 1) is an overkill, so I did not include that in my
> >>> proposals.
> >>>> Between 2) and 3) I'm slightly preferring 3) since even under this
> case
> >>> we
> >>>> are sort of failing fast because we will throw the exception at the
> first
> >>>> rebalance, but I can still see the value of option 2). Ideally we can
> >>> have
> >>>> some APIs from AdminClient to check API versions but this does not
> exist
> >>>> today and I do not want to drag too long with growing scope on this
> KIP,
> >>> so
> >>>> the current proposal's implementation uses expendable Network which
> is a
> >>>> bit fuzzy.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <
> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> I would prefer to keep the current check. We could even improve it,
> and
> >>>>> do the check to more than one brokers (even all provided
> >>>>> bootstrap.servers) or some random servers after we got all meta data
> >>>>> about the cluster.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 11/7/17 1:01 AM, Guozhang Wang wrote:
> >>>>>> Hello folks,
> >>>>>>
> >>>>>> One update I'd like to propose regarding "compatibility checking":
> >>>>>> currently we create a single StreamsKafkaClient at the beginning to
> >>>> issue
> >>>>>> an ApiVersionsRequest to a random broker and then check on its
> >>>> versions,
> >>>>>> and fail if it does not satisfy the version (0.10.1+ without EOS,
> >>>> 0.11.0
> >>>>>> with EOS); after this check we throw this client away. My original
> >>> plan
> >>>>> is
> >>>>>> to replace this logic with the NetworkClient's own apiVersions, but
> >>>> after
> >>>>>> some digging while working on the PR I found that exposing this
> >>>>> apiVersions
> >>>>>> variable from NetworkClient through AdminClient is not very straight
> >>>>>> forward, plus it would need an API change on the AdminClient itself
> >>> as
> >>>>> well
> >>>>>> to expose the versions information.
> >>>>>>
> >>>>>> On the other hand, this one-time compatibility checking's benefit
> may
> >>>> be
> >>>>>> not significant: 1) it asks a random broker upon starting up, and
> >>> hence
> >>>>>> does not guarantee all broker's support the corresponding API
> >>> versions
> >>>> at
> >>>>>> that time; 2) brokers can still be upgraded / downgraded after the
> >>>>> streams
> >>>>>> app is up and running, and hence we still need to handle
> >>>>>> UnsupportedVersionExceptions thrown from the producer / consumer /
> >>>> admin
> >>>>>> client during the runtime anyways.
> >>>>>>
> >>>>>> So I'd like to propose two options in this KIP:
> >>>>>>
> >>>>>> 1) we remove this one-time compatibility check on Streams starting
> up
> >>>> in
> >>>>>> this KIP, and solely rely on handling producer / consumer / admin
> >>>>> client's
> >>>>>> API UnsupportedVersionException throwable exceptions. Please share
> >>> your
> >>>>>> thoughts about this.
> >>>>>>
> >>>>>> 2) we create a one-time NetworkClient upon starting up, send the
> >>>>>> ApiVersionsRequest and get the response and do the checking; after
> >>> that
> >>>>>> throw this client away.
> >>>>>>
> >>>>>> Please let me know what do you think. Thanks!
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <
> >>> matthias@confluent.io
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the update and clarification.
> >>>>>>>
> >>>>>>> Sounds good to me :)
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 11/6/17 12:16 AM, Guozhang Wang wrote:
> >>>>>>>> Thanks Matthias,
> >>>>>>>>
> >>>>>>>> 1) Updated the KIP page to include KAFKA-6126.
> >>>>>>>> 2) For passing configs, I agree, will make a pass over the
> existing
> >>>>>>> configs
> >>>>>>>> passed to StreamsKafkaClient and update the wiki page accordingly,
> >>> to
> >>>>>>>> capture all changes that would happen for the replacement in this
> >>>>> single
> >>>>>>>> KIP.
> >>>>>>>> 3) For internal topic purging, I'm not sure if we need to include
> >>>> this
> >>>>>>> as a
> >>>>>>>> public change since internal topics are meant for abstracted away
> >>>> from
> >>>>>>> the
> >>>>>>>> Streams users; they should not leverage such internal topics
> >>>> elsewhere
> >>>>>>>> themselves. The only thing I can think of is for Kafka operators
> >>> this
> >>>>>>> would
> >>>>>>>> mean that such internal topics would be largely reduced in their
> >>>>>>> footprint,
> >>>>>>>> but that would not be needed in the KIP as well.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <
> >>>> matthias@confluent.io
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I like this KIP. Can you also link to
> >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
> >>>>>>>>>
> >>>>>>>>> What I am wondering though: if we start to partially (ie, step by
> >>>>> step)
> >>>>>>>>> replace the existing StreamsKafkaClient with Java AdminClient,
> >>> don't
> >>>>> we
> >>>>>>>>> need more KIPs? For example, if we use purge-api for internal
> >>>> topics,
> >>>>> it
> >>>>>>>>> seems like a change that requires a KIP. Similar for passing
> >>> configs
> >>>>> --
> >>>>>>>>> the old client might have different config than the old client?
> >>> Can
> >>>> we
> >>>>>>>>> double check this?
> >>>>>>>>>
> >>>>>>>>> Thus, it might make sense to replace the old client with the new
> >>> one
> >>>>> in
> >>>>>>>>> one shot.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 11/4/17 4:01 AM, Ted Yu wrote:
> >>>>>>>>>> Looks good overall.
> >>>>>>>>>>
> >>>>>>>>>> bq. the creation within StreamsPartitionAssignor
> >>>>>>>>>>
> >>>>>>>>>> Typo above: should be StreamPartitionAssignor
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <
> >>> wangguoz@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello folks,
> >>>>>>>>>>>
> >>>>>>>>>>> I have filed a new KIP on adding AdminClient into Streams for
> >>>>> internal
> >>>>>>>>>>> topic management.
> >>>>>>>>>>>
> >>>>>>>>>>> Looking for feedback on
> >>>>>>>>>>>
> >>>>>>>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by "Matthias J. Sax" <ma...@confluent.io>.
One more thing. Can you update the KIP accordingly. It still says:

 - Compatibility check: we will use a network client for this purpose,
as it is a one-time thing.

Additionally, I think we should add a "admin." prefix that allows to set
certain config parameters for the admin client only. Similar to
producer/consumer.

Can we add this to the KIP?


-Matthias

On 11/14/17 2:51 PM, Matthias J. Sax wrote:
> Thanks for looking into this into details!
> 
> As mentioned, I would like to keep the check, but if it's too much
> overhead, I agree that it's not worth it.
> 
> Thanks.
> 
> -Matthias
> 
> On 11/14/17 10:00 AM, Guozhang Wang wrote:
>> I looked into how to use a NetworkClient to replace StreamsKafkaClient to
>> do this one-time checking, and the complexity is actually pretty high:
>> since it is a barebone NetworkClient, we have to handle the connection /
>> readiness / find a broker to send to / etc logic plus introducing all these
>> dependencies into KafkaStreams class. So I have decided to not do this in
>> this KIP. If people feel strongly about this let's discuss more.
>>
>>
>> I'll start the voting process on the mailing list now.
>>
>>
>> Guozhang
>>
>> On Fri, Nov 10, 2017 at 11:47 AM, Bill Bejeck <bb...@gmail.com> wrote:
>>
>>> I'm leaning towards option 3, although option 2 is a reasonable tradeoff
>>> between the two.
>>>
>>> Overall I'm leaning towards option 3 because:
>>>
>>>    1. As Guozhang has said we are failing "fast enough" with an Exception
>>>    from the first rebalance.
>>>    2. Less complexity/maintenance cost by not having a transient network
>>>    client
>>>    3. Ideally, this check should be on the AdminClient itself but adding
>>>    such a check creates "scope creep" for this KIP.
>>>
>>> IMHO the combination of these reasons makes option 3 my preferred approach.
>>>
>>> Thanks,
>>> Bill
>>>
>>> On Tue, Nov 7, 2017 at 12:48 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>>
>>>> Here is what I'm thinking about this trade-off: we want to fail fast when
>>>> brokers do not yet support the requested API version, with the cost that
>>> we
>>>> need to do this one-time thing with an expensed NetworkClient plus a bit
>>>> longer starting up latency. Here are a few different options:
>>>>
>>>> 1) we ask all the brokers: this is one extreme of the trade-off, we still
>>>> need to handle UnsupportedApiVersionsException since brokers can
>>>> downgrade.
>>>> 2) we ask a random broker: this is what we did, and a bit weaker than 1)
>>>> but saves on latency.
>>>> 3) we do not ask anyone: this is the other extreme of the trade-off.
>>>>
>>>> To me I think 1) is an overkill, so I did not include that in my
>>> proposals.
>>>> Between 2) and 3) I'm slightly preferring 3) since even under this case
>>> we
>>>> are sort of failing fast because we will throw the exception at the first
>>>> rebalance, but I can still see the value of option 2). Ideally we can
>>> have
>>>> some APIs from AdminClient to check API versions but this does not exist
>>>> today and I do not want to drag too long with growing scope on this KIP,
>>> so
>>>> the current proposal's implementation uses expendable Network which is a
>>>> bit fuzzy.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>>
>>>>> I would prefer to keep the current check. We could even improve it, and
>>>>> do the check to more than one brokers (even all provided
>>>>> bootstrap.servers) or some random servers after we got all meta data
>>>>> about the cluster.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 11/7/17 1:01 AM, Guozhang Wang wrote:
>>>>>> Hello folks,
>>>>>>
>>>>>> One update I'd like to propose regarding "compatibility checking":
>>>>>> currently we create a single StreamsKafkaClient at the beginning to
>>>> issue
>>>>>> an ApiVersionsRequest to a random broker and then check on its
>>>> versions,
>>>>>> and fail if it does not satisfy the version (0.10.1+ without EOS,
>>>> 0.11.0
>>>>>> with EOS); after this check we throw this client away. My original
>>> plan
>>>>> is
>>>>>> to replace this logic with the NetworkClient's own apiVersions, but
>>>> after
>>>>>> some digging while working on the PR I found that exposing this
>>>>> apiVersions
>>>>>> variable from NetworkClient through AdminClient is not very straight
>>>>>> forward, plus it would need an API change on the AdminClient itself
>>> as
>>>>> well
>>>>>> to expose the versions information.
>>>>>>
>>>>>> On the other hand, this one-time compatibility checking's benefit may
>>>> be
>>>>>> not significant: 1) it asks a random broker upon starting up, and
>>> hence
>>>>>> does not guarantee all broker's support the corresponding API
>>> versions
>>>> at
>>>>>> that time; 2) brokers can still be upgraded / downgraded after the
>>>>> streams
>>>>>> app is up and running, and hence we still need to handle
>>>>>> UnsupportedVersionExceptions thrown from the producer / consumer /
>>>> admin
>>>>>> client during the runtime anyways.
>>>>>>
>>>>>> So I'd like to propose two options in this KIP:
>>>>>>
>>>>>> 1) we remove this one-time compatibility check on Streams starting up
>>>> in
>>>>>> this KIP, and solely rely on handling producer / consumer / admin
>>>>> client's
>>>>>> API UnsupportedVersionException throwable exceptions. Please share
>>> your
>>>>>> thoughts about this.
>>>>>>
>>>>>> 2) we create a one-time NetworkClient upon starting up, send the
>>>>>> ApiVersionsRequest and get the response and do the checking; after
>>> that
>>>>>> throw this client away.
>>>>>>
>>>>>> Please let me know what do you think. Thanks!
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <
>>> matthias@confluent.io
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the update and clarification.
>>>>>>>
>>>>>>> Sounds good to me :)
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 11/6/17 12:16 AM, Guozhang Wang wrote:
>>>>>>>> Thanks Matthias,
>>>>>>>>
>>>>>>>> 1) Updated the KIP page to include KAFKA-6126.
>>>>>>>> 2) For passing configs, I agree, will make a pass over the existing
>>>>>>> configs
>>>>>>>> passed to StreamsKafkaClient and update the wiki page accordingly,
>>> to
>>>>>>>> capture all changes that would happen for the replacement in this
>>>>> single
>>>>>>>> KIP.
>>>>>>>> 3) For internal topic purging, I'm not sure if we need to include
>>>> this
>>>>>>> as a
>>>>>>>> public change since internal topics are meant for abstracted away
>>>> from
>>>>>>> the
>>>>>>>> Streams users; they should not leverage such internal topics
>>>> elsewhere
>>>>>>>> themselves. The only thing I can think of is for Kafka operators
>>> this
>>>>>>> would
>>>>>>>> mean that such internal topics would be largely reduced in their
>>>>>>> footprint,
>>>>>>>> but that would not be needed in the KIP as well.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <
>>>> matthias@confluent.io
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I like this KIP. Can you also link to
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
>>>>>>>>>
>>>>>>>>> What I am wondering though: if we start to partially (ie, step by
>>>>> step)
>>>>>>>>> replace the existing StreamsKafkaClient with Java AdminClient,
>>> don't
>>>>> we
>>>>>>>>> need more KIPs? For example, if we use purge-api for internal
>>>> topics,
>>>>> it
>>>>>>>>> seems like a change that requires a KIP. Similar for passing
>>> configs
>>>>> --
>>>>>>>>> the old client might have different config than the old client?
>>> Can
>>>> we
>>>>>>>>> double check this?
>>>>>>>>>
>>>>>>>>> Thus, it might make sense to replace the old client with the new
>>> one
>>>>> in
>>>>>>>>> one shot.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 11/4/17 4:01 AM, Ted Yu wrote:
>>>>>>>>>> Looks good overall.
>>>>>>>>>>
>>>>>>>>>> bq. the creation within StreamsPartitionAssignor
>>>>>>>>>>
>>>>>>>>>> Typo above: should be StreamPartitionAssignor
>>>>>>>>>>
>>>>>>>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <
>>> wangguoz@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello folks,
>>>>>>>>>>>
>>>>>>>>>>> I have filed a new KIP on adding AdminClient into Streams for
>>>>> internal
>>>>>>>>>>> topic management.
>>>>>>>>>>>
>>>>>>>>>>> Looking for feedback on
>>>>>>>>>>>
>>>>>>>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>>
> 


Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for looking into this into details!

As mentioned, I would like to keep the check, but if it's too much
overhead, I agree that it's not worth it.

Thanks.

-Matthias

On 11/14/17 10:00 AM, Guozhang Wang wrote:
> I looked into how to use a NetworkClient to replace StreamsKafkaClient to
> do this one-time checking, and the complexity is actually pretty high:
> since it is a barebone NetworkClient, we have to handle the connection /
> readiness / find a broker to send to / etc logic plus introducing all these
> dependencies into KafkaStreams class. So I have decided to not do this in
> this KIP. If people feel strongly about this let's discuss more.
> 
> 
> I'll start the voting process on the mailing list now.
> 
> 
> Guozhang
> 
> On Fri, Nov 10, 2017 at 11:47 AM, Bill Bejeck <bb...@gmail.com> wrote:
> 
>> I'm leaning towards option 3, although option 2 is a reasonable tradeoff
>> between the two.
>>
>> Overall I'm leaning towards option 3 because:
>>
>>    1. As Guozhang has said we are failing "fast enough" with an Exception
>>    from the first rebalance.
>>    2. Less complexity/maintenance cost by not having a transient network
>>    client
>>    3. Ideally, this check should be on the AdminClient itself but adding
>>    such a check creates "scope creep" for this KIP.
>>
>> IMHO the combination of these reasons makes option 3 my preferred approach.
>>
>> Thanks,
>> Bill
>>
>> On Tue, Nov 7, 2017 at 12:48 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Here is what I'm thinking about this trade-off: we want to fail fast when
>>> brokers do not yet support the requested API version, with the cost that
>> we
>>> need to do this one-time thing with an expensed NetworkClient plus a bit
>>> longer starting up latency. Here are a few different options:
>>>
>>> 1) we ask all the brokers: this is one extreme of the trade-off, we still
>>> need to handle UnsupportedApiVersionsException since brokers can
>>> downgrade.
>>> 2) we ask a random broker: this is what we did, and a bit weaker than 1)
>>> but saves on latency.
>>> 3) we do not ask anyone: this is the other extreme of the trade-off.
>>>
>>> To me I think 1) is an overkill, so I did not include that in my
>> proposals.
>>> Between 2) and 3) I'm slightly preferring 3) since even under this case
>> we
>>> are sort of failing fast because we will throw the exception at the first
>>> rebalance, but I can still see the value of option 2). Ideally we can
>> have
>>> some APIs from AdminClient to check API versions but this does not exist
>>> today and I do not want to drag too long with growing scope on this KIP,
>> so
>>> the current proposal's implementation uses expendable Network which is a
>>> bit fuzzy.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> I would prefer to keep the current check. We could even improve it, and
>>>> do the check to more than one brokers (even all provided
>>>> bootstrap.servers) or some random servers after we got all meta data
>>>> about the cluster.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/7/17 1:01 AM, Guozhang Wang wrote:
>>>>> Hello folks,
>>>>>
>>>>> One update I'd like to propose regarding "compatibility checking":
>>>>> currently we create a single StreamsKafkaClient at the beginning to
>>> issue
>>>>> an ApiVersionsRequest to a random broker and then check on its
>>> versions,
>>>>> and fail if it does not satisfy the version (0.10.1+ without EOS,
>>> 0.11.0
>>>>> with EOS); after this check we throw this client away. My original
>> plan
>>>> is
>>>>> to replace this logic with the NetworkClient's own apiVersions, but
>>> after
>>>>> some digging while working on the PR I found that exposing this
>>>> apiVersions
>>>>> variable from NetworkClient through AdminClient is not very straight
>>>>> forward, plus it would need an API change on the AdminClient itself
>> as
>>>> well
>>>>> to expose the versions information.
>>>>>
>>>>> On the other hand, this one-time compatibility checking's benefit may
>>> be
>>>>> not significant: 1) it asks a random broker upon starting up, and
>> hence
>>>>> does not guarantee all broker's support the corresponding API
>> versions
>>> at
>>>>> that time; 2) brokers can still be upgraded / downgraded after the
>>>> streams
>>>>> app is up and running, and hence we still need to handle
>>>>> UnsupportedVersionExceptions thrown from the producer / consumer /
>>> admin
>>>>> client during the runtime anyways.
>>>>>
>>>>> So I'd like to propose two options in this KIP:
>>>>>
>>>>> 1) we remove this one-time compatibility check on Streams starting up
>>> in
>>>>> this KIP, and solely rely on handling producer / consumer / admin
>>>> client's
>>>>> API UnsupportedVersionException throwable exceptions. Please share
>> your
>>>>> thoughts about this.
>>>>>
>>>>> 2) we create a one-time NetworkClient upon starting up, send the
>>>>> ApiVersionsRequest and get the response and do the checking; after
>> that
>>>>> throw this client away.
>>>>>
>>>>> Please let me know what do you think. Thanks!
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <
>> matthias@confluent.io
>>>>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the update and clarification.
>>>>>>
>>>>>> Sounds good to me :)
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 11/6/17 12:16 AM, Guozhang Wang wrote:
>>>>>>> Thanks Matthias,
>>>>>>>
>>>>>>> 1) Updated the KIP page to include KAFKA-6126.
>>>>>>> 2) For passing configs, I agree, will make a pass over the existing
>>>>>> configs
>>>>>>> passed to StreamsKafkaClient and update the wiki page accordingly,
>> to
>>>>>>> capture all changes that would happen for the replacement in this
>>>> single
>>>>>>> KIP.
>>>>>>> 3) For internal topic purging, I'm not sure if we need to include
>>> this
>>>>>> as a
>>>>>>> public change since internal topics are meant for abstracted away
>>> from
>>>>>> the
>>>>>>> Streams users; they should not leverage such internal topics
>>> elsewhere
>>>>>>> themselves. The only thing I can think of is for Kafka operators
>> this
>>>>>> would
>>>>>>> mean that such internal topics would be largely reduced in their
>>>>>> footprint,
>>>>>>> but that would not be needed in the KIP as well.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <
>>> matthias@confluent.io
>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I like this KIP. Can you also link to
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
>>>>>>>>
>>>>>>>> What I am wondering though: if we start to partially (ie, step by
>>>> step)
>>>>>>>> replace the existing StreamsKafkaClient with Java AdminClient,
>> don't
>>>> we
>>>>>>>> need more KIPs? For example, if we use purge-api for internal
>>> topics,
>>>> it
>>>>>>>> seems like a change that requires a KIP. Similar for passing
>> configs
>>>> --
>>>>>>>> the old client might have different config than the old client?
>> Can
>>> we
>>>>>>>> double check this?
>>>>>>>>
>>>>>>>> Thus, it might make sense to replace the old client with the new
>> one
>>>> in
>>>>>>>> one shot.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 11/4/17 4:01 AM, Ted Yu wrote:
>>>>>>>>> Looks good overall.
>>>>>>>>>
>>>>>>>>> bq. the creation within StreamsPartitionAssignor
>>>>>>>>>
>>>>>>>>> Typo above: should be StreamPartitionAssignor
>>>>>>>>>
>>>>>>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello folks,
>>>>>>>>>>
>>>>>>>>>> I have filed a new KIP on adding AdminClient into Streams for
>>>> internal
>>>>>>>>>> topic management.
>>>>>>>>>>
>>>>>>>>>> Looking for feedback on
>>>>>>>>>>
>>>>>>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
> 


Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
I looked into how to use a NetworkClient to replace StreamsKafkaClient to
do this one-time checking, and the complexity is actually pretty high:
since it is a barebone NetworkClient, we have to handle the connection /
readiness / find a broker to send to / etc logic plus introducing all these
dependencies into KafkaStreams class. So I have decided to not do this in
this KIP. If people feel strongly about this let's discuss more.


I'll start the voting process on the mailing list now.


Guozhang

On Fri, Nov 10, 2017 at 11:47 AM, Bill Bejeck <bb...@gmail.com> wrote:

> I'm leaning towards option 3, although option 2 is a reasonable tradeoff
> between the two.
>
> Overall I'm leaning towards option 3 because:
>
>    1. As Guozhang has said we are failing "fast enough" with an Exception
>    from the first rebalance.
>    2. Less complexity/maintenance cost by not having a transient network
>    client
>    3. Ideally, this check should be on the AdminClient itself but adding
>    such a check creates "scope creep" for this KIP.
>
> IMHO the combination of these reasons makes option 3 my preferred approach.
>
> Thanks,
> Bill
>
> On Tue, Nov 7, 2017 at 12:48 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Here is what I'm thinking about this trade-off: we want to fail fast when
> > brokers do not yet support the requested API version, with the cost that
> we
> > need to do this one-time thing with an expensed NetworkClient plus a bit
> > longer starting up latency. Here are a few different options:
> >
> > 1) we ask all the brokers: this is one extreme of the trade-off, we still
> > need to handle UnsupportedApiVersionsException since brokers can
> > downgrade.
> > 2) we ask a random broker: this is what we did, and a bit weaker than 1)
> > but saves on latency.
> > 3) we do not ask anyone: this is the other extreme of the trade-off.
> >
> > To me I think 1) is an overkill, so I did not include that in my
> proposals.
> > Between 2) and 3) I'm slightly preferring 3) since even under this case
> we
> > are sort of failing fast because we will throw the exception at the first
> > rebalance, but I can still see the value of option 2). Ideally we can
> have
> > some APIs from AdminClient to check API versions but this does not exist
> > today and I do not want to drag too long with growing scope on this KIP,
> so
> > the current proposal's implementation uses expendable Network which is a
> > bit fuzzy.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > I would prefer to keep the current check. We could even improve it, and
> > > do the check to more than one brokers (even all provided
> > > bootstrap.servers) or some random servers after we got all meta data
> > > about the cluster.
> > >
> > >
> > > -Matthias
> > >
> > > On 11/7/17 1:01 AM, Guozhang Wang wrote:
> > > > Hello folks,
> > > >
> > > > One update I'd like to propose regarding "compatibility checking":
> > > > currently we create a single StreamsKafkaClient at the beginning to
> > issue
> > > > an ApiVersionsRequest to a random broker and then check on its
> > versions,
> > > > and fail if it does not satisfy the version (0.10.1+ without EOS,
> > 0.11.0
> > > > with EOS); after this check we throw this client away. My original
> plan
> > > is
> > > > to replace this logic with the NetworkClient's own apiVersions, but
> > after
> > > > some digging while working on the PR I found that exposing this
> > > apiVersions
> > > > variable from NetworkClient through AdminClient is not very straight
> > > > forward, plus it would need an API change on the AdminClient itself
> as
> > > well
> > > > to expose the versions information.
> > > >
> > > > On the other hand, this one-time compatibility checking's benefit may
> > be
> > > > not significant: 1) it asks a random broker upon starting up, and
> hence
> > > > does not guarantee all broker's support the corresponding API
> versions
> > at
> > > > that time; 2) brokers can still be upgraded / downgraded after the
> > > streams
> > > > app is up and running, and hence we still need to handle
> > > > UnsupportedVersionExceptions thrown from the producer / consumer /
> > admin
> > > > client during the runtime anyways.
> > > >
> > > > So I'd like to propose two options in this KIP:
> > > >
> > > > 1) we remove this one-time compatibility check on Streams starting up
> > in
> > > > this KIP, and solely rely on handling producer / consumer / admin
> > > client's
> > > > API UnsupportedVersionException throwable exceptions. Please share
> your
> > > > thoughts about this.
> > > >
> > > > 2) we create a one-time NetworkClient upon starting up, send the
> > > > ApiVersionsRequest and get the response and do the checking; after
> that
> > > > throw this client away.
> > > >
> > > > Please let me know what do you think. Thanks!
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Thanks for the update and clarification.
> > > >>
> > > >> Sounds good to me :)
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 11/6/17 12:16 AM, Guozhang Wang wrote:
> > > >>> Thanks Matthias,
> > > >>>
> > > >>> 1) Updated the KIP page to include KAFKA-6126.
> > > >>> 2) For passing configs, I agree, will make a pass over the existing
> > > >> configs
> > > >>> passed to StreamsKafkaClient and update the wiki page accordingly,
> to
> > > >>> capture all changes that would happen for the replacement in this
> > > single
> > > >>> KIP.
> > > >>> 3) For internal topic purging, I'm not sure if we need to include
> > this
> > > >> as a
> > > >>> public change since internal topics are meant for abstracted away
> > from
> > > >> the
> > > >>> Streams users; they should not leverage such internal topics
> > elsewhere
> > > >>> themselves. The only thing I can think of is for Kafka operators
> this
> > > >> would
> > > >>> mean that such internal topics would be largely reduced in their
> > > >> footprint,
> > > >>> but that would not be needed in the KIP as well.
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> I like this KIP. Can you also link to
> > > >>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
> > > >>>>
> > > >>>> What I am wondering though: if we start to partially (ie, step by
> > > step)
> > > >>>> replace the existing StreamsKafkaClient with Java AdminClient,
> don't
> > > we
> > > >>>> need more KIPs? For example, if we use purge-api for internal
> > topics,
> > > it
> > > >>>> seems like a change that requires a KIP. Similar for passing
> configs
> > > --
> > > >>>> the old client might have different config than the old client?
> Can
> > we
> > > >>>> double check this?
> > > >>>>
> > > >>>> Thus, it might make sense to replace the old client with the new
> one
> > > in
> > > >>>> one shot.
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>> On 11/4/17 4:01 AM, Ted Yu wrote:
> > > >>>>> Looks good overall.
> > > >>>>>
> > > >>>>> bq. the creation within StreamsPartitionAssignor
> > > >>>>>
> > > >>>>> Typo above: should be StreamPartitionAssignor
> > > >>>>>
> > > >>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Hello folks,
> > > >>>>>>
> > > >>>>>> I have filed a new KIP on adding AdminClient into Streams for
> > > internal
> > > >>>>>> topic management.
> > > >>>>>>
> > > >>>>>> Looking for feedback on
> > > >>>>>>
> > > >>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> > > >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> -- Guozhang
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >>
> > > >
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Bill Bejeck <bb...@gmail.com>.
I'm leaning towards option 3, although option 2 is a reasonable tradeoff
between the two.

Overall I'm leaning towards option 3 because:

   1. As Guozhang has said we are failing "fast enough" with an Exception
   from the first rebalance.
   2. Less complexity/maintenance cost by not having a transient network
   client
   3. Ideally, this check should be on the AdminClient itself but adding
   such a check creates "scope creep" for this KIP.

IMHO the combination of these reasons makes option 3 my preferred approach.

Thanks,
Bill

On Tue, Nov 7, 2017 at 12:48 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Here is what I'm thinking about this trade-off: we want to fail fast when
> brokers do not yet support the requested API version, with the cost that we
> need to do this one-time thing with an expensed NetworkClient plus a bit
> longer starting up latency. Here are a few different options:
>
> 1) we ask all the brokers: this is one extreme of the trade-off, we still
> need to handle UnsupportedApiVersionsException since brokers can
> downgrade.
> 2) we ask a random broker: this is what we did, and a bit weaker than 1)
> but saves on latency.
> 3) we do not ask anyone: this is the other extreme of the trade-off.
>
> To me I think 1) is an overkill, so I did not include that in my proposals.
> Between 2) and 3) I'm slightly preferring 3) since even under this case we
> are sort of failing fast because we will throw the exception at the first
> rebalance, but I can still see the value of option 2). Ideally we can have
> some APIs from AdminClient to check API versions but this does not exist
> today and I do not want to drag too long with growing scope on this KIP, so
> the current proposal's implementation uses expendable Network which is a
> bit fuzzy.
>
>
> Guozhang
>
>
> On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > I would prefer to keep the current check. We could even improve it, and
> > do the check to more than one brokers (even all provided
> > bootstrap.servers) or some random servers after we got all meta data
> > about the cluster.
> >
> >
> > -Matthias
> >
> > On 11/7/17 1:01 AM, Guozhang Wang wrote:
> > > Hello folks,
> > >
> > > One update I'd like to propose regarding "compatibility checking":
> > > currently we create a single StreamsKafkaClient at the beginning to
> issue
> > > an ApiVersionsRequest to a random broker and then check on its
> versions,
> > > and fail if it does not satisfy the version (0.10.1+ without EOS,
> 0.11.0
> > > with EOS); after this check we throw this client away. My original plan
> > is
> > > to replace this logic with the NetworkClient's own apiVersions, but
> after
> > > some digging while working on the PR I found that exposing this
> > apiVersions
> > > variable from NetworkClient through AdminClient is not very straight
> > > forward, plus it would need an API change on the AdminClient itself as
> > well
> > > to expose the versions information.
> > >
> > > On the other hand, this one-time compatibility checking's benefit may
> be
> > > not significant: 1) it asks a random broker upon starting up, and hence
> > > does not guarantee all broker's support the corresponding API versions
> at
> > > that time; 2) brokers can still be upgraded / downgraded after the
> > streams
> > > app is up and running, and hence we still need to handle
> > > UnsupportedVersionExceptions thrown from the producer / consumer /
> admin
> > > client during the runtime anyways.
> > >
> > > So I'd like to propose two options in this KIP:
> > >
> > > 1) we remove this one-time compatibility check on Streams starting up
> in
> > > this KIP, and solely rely on handling producer / consumer / admin
> > client's
> > > API UnsupportedVersionException throwable exceptions. Please share your
> > > thoughts about this.
> > >
> > > 2) we create a one-time NetworkClient upon starting up, send the
> > > ApiVersionsRequest and get the response and do the checking; after that
> > > throw this client away.
> > >
> > > Please let me know what do you think. Thanks!
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> Thanks for the update and clarification.
> > >>
> > >> Sounds good to me :)
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 11/6/17 12:16 AM, Guozhang Wang wrote:
> > >>> Thanks Matthias,
> > >>>
> > >>> 1) Updated the KIP page to include KAFKA-6126.
> > >>> 2) For passing configs, I agree, will make a pass over the existing
> > >> configs
> > >>> passed to StreamsKafkaClient and update the wiki page accordingly, to
> > >>> capture all changes that would happen for the replacement in this
> > single
> > >>> KIP.
> > >>> 3) For internal topic purging, I'm not sure if we need to include
> this
> > >> as a
> > >>> public change since internal topics are meant for abstracted away
> from
> > >> the
> > >>> Streams users; they should not leverage such internal topics
> elsewhere
> > >>> themselves. The only thing I can think of is for Kafka operators this
> > >> would
> > >>> mean that such internal topics would be largely reduced in their
> > >> footprint,
> > >>> but that would not be needed in the KIP as well.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> I like this KIP. Can you also link to
> > >>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
> > >>>>
> > >>>> What I am wondering though: if we start to partially (ie, step by
> > step)
> > >>>> replace the existing StreamsKafkaClient with Java AdminClient, don't
> > we
> > >>>> need more KIPs? For example, if we use purge-api for internal
> topics,
> > it
> > >>>> seems like a change that requires a KIP. Similar for passing configs
> > --
> > >>>> the old client might have different config than the old client? Can
> we
> > >>>> double check this?
> > >>>>
> > >>>> Thus, it might make sense to replace the old client with the new one
> > in
> > >>>> one shot.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 11/4/17 4:01 AM, Ted Yu wrote:
> > >>>>> Looks good overall.
> > >>>>>
> > >>>>> bq. the creation within StreamsPartitionAssignor
> > >>>>>
> > >>>>> Typo above: should be StreamPartitionAssignor
> > >>>>>
> > >>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hello folks,
> > >>>>>>
> > >>>>>> I have filed a new KIP on adding AdminClient into Streams for
> > internal
> > >>>>>> topic management.
> > >>>>>>
> > >>>>>> Looking for feedback on
> > >>>>>>
> > >>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> > >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >>
> > >
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
Here is what I'm thinking about this trade-off: we want to fail fast when
brokers do not yet support the requested API version, with the cost that we
need to do this one-time thing with an expensed NetworkClient plus a bit
longer starting up latency. Here are a few different options:

1) we ask all the brokers: this is one extreme of the trade-off, we still
need to handle UnsupportedApiVersionsException since brokers can downgrade.
2) we ask a random broker: this is what we did, and a bit weaker than 1)
but saves on latency.
3) we do not ask anyone: this is the other extreme of the trade-off.

To me I think 1) is an overkill, so I did not include that in my proposals.
Between 2) and 3) I'm slightly preferring 3) since even under this case we
are sort of failing fast because we will throw the exception at the first
rebalance, but I can still see the value of option 2). Ideally we can have
some APIs from AdminClient to check API versions but this does not exist
today and I do not want to drag too long with growing scope on this KIP, so
the current proposal's implementation uses expendable Network which is a
bit fuzzy.


Guozhang


On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I would prefer to keep the current check. We could even improve it, and
> do the check to more than one brokers (even all provided
> bootstrap.servers) or some random servers after we got all meta data
> about the cluster.
>
>
> -Matthias
>
> On 11/7/17 1:01 AM, Guozhang Wang wrote:
> > Hello folks,
> >
> > One update I'd like to propose regarding "compatibility checking":
> > currently we create a single StreamsKafkaClient at the beginning to issue
> > an ApiVersionsRequest to a random broker and then check on its versions,
> > and fail if it does not satisfy the version (0.10.1+ without EOS, 0.11.0
> > with EOS); after this check we throw this client away. My original plan
> is
> > to replace this logic with the NetworkClient's own apiVersions, but after
> > some digging while working on the PR I found that exposing this
> apiVersions
> > variable from NetworkClient through AdminClient is not very straight
> > forward, plus it would need an API change on the AdminClient itself as
> well
> > to expose the versions information.
> >
> > On the other hand, this one-time compatibility checking's benefit may be
> > not significant: 1) it asks a random broker upon starting up, and hence
> > does not guarantee all broker's support the corresponding API versions at
> > that time; 2) brokers can still be upgraded / downgraded after the
> streams
> > app is up and running, and hence we still need to handle
> > UnsupportedVersionExceptions thrown from the producer / consumer / admin
> > client during the runtime anyways.
> >
> > So I'd like to propose two options in this KIP:
> >
> > 1) we remove this one-time compatibility check on Streams starting up in
> > this KIP, and solely rely on handling producer / consumer / admin
> client's
> > API UnsupportedVersionException throwable exceptions. Please share your
> > thoughts about this.
> >
> > 2) we create a one-time NetworkClient upon starting up, send the
> > ApiVersionsRequest and get the response and do the checking; after that
> > throw this client away.
> >
> > Please let me know what do you think. Thanks!
> >
> >
> > Guozhang
> >
> >
> > On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Thanks for the update and clarification.
> >>
> >> Sounds good to me :)
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 11/6/17 12:16 AM, Guozhang Wang wrote:
> >>> Thanks Matthias,
> >>>
> >>> 1) Updated the KIP page to include KAFKA-6126.
> >>> 2) For passing configs, I agree, will make a pass over the existing
> >> configs
> >>> passed to StreamsKafkaClient and update the wiki page accordingly, to
> >>> capture all changes that would happen for the replacement in this
> single
> >>> KIP.
> >>> 3) For internal topic purging, I'm not sure if we need to include this
> >> as a
> >>> public change since internal topics are meant for abstracted away from
> >> the
> >>> Streams users; they should not leverage such internal topics elsewhere
> >>> themselves. The only thing I can think of is for Kafka operators this
> >> would
> >>> mean that such internal topics would be largely reduced in their
> >> footprint,
> >>> but that would not be needed in the KIP as well.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> I like this KIP. Can you also link to
> >>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
> >>>>
> >>>> What I am wondering though: if we start to partially (ie, step by
> step)
> >>>> replace the existing StreamsKafkaClient with Java AdminClient, don't
> we
> >>>> need more KIPs? For example, if we use purge-api for internal topics,
> it
> >>>> seems like a change that requires a KIP. Similar for passing configs
> --
> >>>> the old client might have different config than the old client? Can we
> >>>> double check this?
> >>>>
> >>>> Thus, it might make sense to replace the old client with the new one
> in
> >>>> one shot.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 11/4/17 4:01 AM, Ted Yu wrote:
> >>>>> Looks good overall.
> >>>>>
> >>>>> bq. the creation within StreamsPartitionAssignor
> >>>>>
> >>>>> Typo above: should be StreamPartitionAssignor
> >>>>>
> >>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hello folks,
> >>>>>>
> >>>>>> I have filed a new KIP on adding AdminClient into Streams for
> internal
> >>>>>> topic management.
> >>>>>>
> >>>>>> Looking for feedback on
> >>>>>>
> >>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I would prefer to keep the current check. We could even improve it, and
do the check to more than one brokers (even all provided
bootstrap.servers) or some random servers after we got all meta data
about the cluster.


-Matthias

On 11/7/17 1:01 AM, Guozhang Wang wrote:
> Hello folks,
> 
> One update I'd like to propose regarding "compatibility checking":
> currently we create a single StreamsKafkaClient at the beginning to issue
> an ApiVersionsRequest to a random broker and then check on its versions,
> and fail if it does not satisfy the version (0.10.1+ without EOS, 0.11.0
> with EOS); after this check we throw this client away. My original plan is
> to replace this logic with the NetworkClient's own apiVersions, but after
> some digging while working on the PR I found that exposing this apiVersions
> variable from NetworkClient through AdminClient is not very straight
> forward, plus it would need an API change on the AdminClient itself as well
> to expose the versions information.
> 
> On the other hand, this one-time compatibility checking's benefit may be
> not significant: 1) it asks a random broker upon starting up, and hence
> does not guarantee all broker's support the corresponding API versions at
> that time; 2) brokers can still be upgraded / downgraded after the streams
> app is up and running, and hence we still need to handle
> UnsupportedVersionExceptions thrown from the producer / consumer / admin
> client during the runtime anyways.
> 
> So I'd like to propose two options in this KIP:
> 
> 1) we remove this one-time compatibility check on Streams starting up in
> this KIP, and solely rely on handling producer / consumer / admin client's
> API UnsupportedVersionException throwable exceptions. Please share your
> thoughts about this.
> 
> 2) we create a one-time NetworkClient upon starting up, send the
> ApiVersionsRequest and get the response and do the checking; after that
> throw this client away.
> 
> Please let me know what do you think. Thanks!
> 
> 
> Guozhang
> 
> 
> On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for the update and clarification.
>>
>> Sounds good to me :)
>>
>>
>> -Matthias
>>
>>
>>
>> On 11/6/17 12:16 AM, Guozhang Wang wrote:
>>> Thanks Matthias,
>>>
>>> 1) Updated the KIP page to include KAFKA-6126.
>>> 2) For passing configs, I agree, will make a pass over the existing
>> configs
>>> passed to StreamsKafkaClient and update the wiki page accordingly, to
>>> capture all changes that would happen for the replacement in this single
>>> KIP.
>>> 3) For internal topic purging, I'm not sure if we need to include this
>> as a
>>> public change since internal topics are meant for abstracted away from
>> the
>>> Streams users; they should not leverage such internal topics elsewhere
>>> themselves. The only thing I can think of is for Kafka operators this
>> would
>>> mean that such internal topics would be largely reduced in their
>> footprint,
>>> but that would not be needed in the KIP as well.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> I like this KIP. Can you also link to
>>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
>>>>
>>>> What I am wondering though: if we start to partially (ie, step by step)
>>>> replace the existing StreamsKafkaClient with Java AdminClient, don't we
>>>> need more KIPs? For example, if we use purge-api for internal topics, it
>>>> seems like a change that requires a KIP. Similar for passing configs --
>>>> the old client might have different config than the old client? Can we
>>>> double check this?
>>>>
>>>> Thus, it might make sense to replace the old client with the new one in
>>>> one shot.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/4/17 4:01 AM, Ted Yu wrote:
>>>>> Looks good overall.
>>>>>
>>>>> bq. the creation within StreamsPartitionAssignor
>>>>>
>>>>> Typo above: should be StreamPartitionAssignor
>>>>>
>>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hello folks,
>>>>>>
>>>>>> I have filed a new KIP on adding AdminClient into Streams for internal
>>>>>> topic management.
>>>>>>
>>>>>> Looking for feedback on
>>>>>>
>>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
> 
> 


Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
Hello folks,

One update I'd like to propose regarding "compatibility checking":
currently we create a single StreamsKafkaClient at the beginning to issue
an ApiVersionsRequest to a random broker and then check on its versions,
and fail if it does not satisfy the version (0.10.1+ without EOS, 0.11.0
with EOS); after this check we throw this client away. My original plan is
to replace this logic with the NetworkClient's own apiVersions, but after
some digging while working on the PR I found that exposing this apiVersions
variable from NetworkClient through AdminClient is not very straight
forward, plus it would need an API change on the AdminClient itself as well
to expose the versions information.

On the other hand, this one-time compatibility checking's benefit may be
not significant: 1) it asks a random broker upon starting up, and hence
does not guarantee all broker's support the corresponding API versions at
that time; 2) brokers can still be upgraded / downgraded after the streams
app is up and running, and hence we still need to handle
UnsupportedVersionExceptions thrown from the producer / consumer / admin
client during the runtime anyways.

So I'd like to propose two options in this KIP:

1) we remove this one-time compatibility check on Streams starting up in
this KIP, and solely rely on handling producer / consumer / admin client's
API UnsupportedVersionException throwable exceptions. Please share your
thoughts about this.

2) we create a one-time NetworkClient upon starting up, send the
ApiVersionsRequest and get the response and do the checking; after that
throw this client away.

Please let me know what do you think. Thanks!


Guozhang


On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the update and clarification.
>
> Sounds good to me :)
>
>
> -Matthias
>
>
>
> On 11/6/17 12:16 AM, Guozhang Wang wrote:
> > Thanks Matthias,
> >
> > 1) Updated the KIP page to include KAFKA-6126.
> > 2) For passing configs, I agree, will make a pass over the existing
> configs
> > passed to StreamsKafkaClient and update the wiki page accordingly, to
> > capture all changes that would happen for the replacement in this single
> > KIP.
> > 3) For internal topic purging, I'm not sure if we need to include this
> as a
> > public change since internal topics are meant for abstracted away from
> the
> > Streams users; they should not leverage such internal topics elsewhere
> > themselves. The only thing I can think of is for Kafka operators this
> would
> > mean that such internal topics would be largely reduced in their
> footprint,
> > but that would not be needed in the KIP as well.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> I like this KIP. Can you also link to
> >> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
> >>
> >> What I am wondering though: if we start to partially (ie, step by step)
> >> replace the existing StreamsKafkaClient with Java AdminClient, don't we
> >> need more KIPs? For example, if we use purge-api for internal topics, it
> >> seems like a change that requires a KIP. Similar for passing configs --
> >> the old client might have different config than the old client? Can we
> >> double check this?
> >>
> >> Thus, it might make sense to replace the old client with the new one in
> >> one shot.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/4/17 4:01 AM, Ted Yu wrote:
> >>> Looks good overall.
> >>>
> >>> bq. the creation within StreamsPartitionAssignor
> >>>
> >>> Typo above: should be StreamPartitionAssignor
> >>>
> >>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>>
> >>>> Hello folks,
> >>>>
> >>>> I have filed a new KIP on adding AdminClient into Streams for internal
> >>>> topic management.
> >>>>
> >>>> Looking for feedback on
> >>>>
> >>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> >>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the update and clarification.

Sounds good to me :)


-Matthias



On 11/6/17 12:16 AM, Guozhang Wang wrote:
> Thanks Matthias,
> 
> 1) Updated the KIP page to include KAFKA-6126.
> 2) For passing configs, I agree, will make a pass over the existing configs
> passed to StreamsKafkaClient and update the wiki page accordingly, to
> capture all changes that would happen for the replacement in this single
> KIP.
> 3) For internal topic purging, I'm not sure if we need to include this as a
> public change since internal topics are meant for abstracted away from the
> Streams users; they should not leverage such internal topics elsewhere
> themselves. The only thing I can think of is for Kafka operators this would
> mean that such internal topics would be largely reduced in their footprint,
> but that would not be needed in the KIP as well.
> 
> 
> Guozhang
> 
> 
> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I like this KIP. Can you also link to
>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
>>
>> What I am wondering though: if we start to partially (ie, step by step)
>> replace the existing StreamsKafkaClient with Java AdminClient, don't we
>> need more KIPs? For example, if we use purge-api for internal topics, it
>> seems like a change that requires a KIP. Similar for passing configs --
>> the old client might have different config than the old client? Can we
>> double check this?
>>
>> Thus, it might make sense to replace the old client with the new one in
>> one shot.
>>
>>
>> -Matthias
>>
>> On 11/4/17 4:01 AM, Ted Yu wrote:
>>> Looks good overall.
>>>
>>> bq. the creation within StreamsPartitionAssignor
>>>
>>> Typo above: should be StreamPartitionAssignor
>>>
>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>
>>>> Hello folks,
>>>>
>>>> I have filed a new KIP on adding AdminClient into Streams for internal
>>>> topic management.
>>>>
>>>> Looking for feedback on
>>>>
>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
> 
> 


Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Matthias,

1) Updated the KIP page to include KAFKA-6126.
2) For passing configs, I agree, will make a pass over the existing configs
passed to StreamsKafkaClient and update the wiki page accordingly, to
capture all changes that would happen for the replacement in this single
KIP.
3) For internal topic purging, I'm not sure if we need to include this as a
public change since internal topics are meant for abstracted away from the
Streams users; they should not leverage such internal topics elsewhere
themselves. The only thing I can think of is for Kafka operators this would
mean that such internal topics would be largely reduced in their footprint,
but that would not be needed in the KIP as well.


Guozhang


On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I like this KIP. Can you also link to
> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?
>
> What I am wondering though: if we start to partially (ie, step by step)
> replace the existing StreamsKafkaClient with Java AdminClient, don't we
> need more KIPs? For example, if we use purge-api for internal topics, it
> seems like a change that requires a KIP. Similar for passing configs --
> the old client might have different config than the old client? Can we
> double check this?
>
> Thus, it might make sense to replace the old client with the new one in
> one shot.
>
>
> -Matthias
>
> On 11/4/17 4:01 AM, Ted Yu wrote:
> > Looks good overall.
> >
> > bq. the creation within StreamsPartitionAssignor
> >
> > Typo above: should be StreamPartitionAssignor
> >
> > On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hello folks,
> >>
> >> I have filed a new KIP on adding AdminClient into Streams for internal
> >> topic management.
> >>
> >> Looking for feedback on
> >>
> >> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I like this KIP. Can you also link to
https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP?

What I am wondering though: if we start to partially (ie, step by step)
replace the existing StreamsKafkaClient with Java AdminClient, don't we
need more KIPs? For example, if we use purge-api for internal topics, it
seems like a change that requires a KIP. Similar for passing configs --
the old client might have different config than the old client? Can we
double check this?

Thus, it might make sense to replace the old client with the new one in
one shot.


-Matthias

On 11/4/17 4:01 AM, Ted Yu wrote:
> Looks good overall.
> 
> bq. the creation within StreamsPartitionAssignor
> 
> Typo above: should be StreamPartitionAssignor
> 
> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
>> Hello folks,
>>
>> I have filed a new KIP on adding AdminClient into Streams for internal
>> topic management.
>>
>> Looking for feedback on
>>
>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>>
>> --
>> -- Guozhang
>>
> 


Re: [DISCUSS] KIP-220: Add AdminClient into Kafka Streams' ClientSupplier

Posted by Ted Yu <yu...@gmail.com>.
Looks good overall.

bq. the creation within StreamsPartitionAssignor

Typo above: should be StreamPartitionAssignor

On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello folks,
>
> I have filed a new KIP on adding AdminClient into Streams for internal
> topic management.
>
> Looking for feedback on
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>*
>
> --
> -- Guozhang
>