You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2018/06/11 00:25:43 UTC

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

What is the status of this KIP?

-Matthias


On 2/13/18 1:43 PM, Matthias J. Sax wrote:
> Is there any update for this KIP?
> 
> 
> -Matthias
> 
> On 12/4/17 2:08 PM, Matthias J. Sax wrote:
>> Jeyhun,
>>
>> thanks for updating the KIP.
>>
>> I am wondering if you intend to add a new class `Produced`? There is
>> already `org.apache.kafka.streams.kstream.Produced`. So if we want to
>> add a new class, it must have a different name -- or we might be able to
>> merge both into one?
>>
>> Also, for the KStream overlaods of `through()` and `to()`, can you add
>> the different behavior using different overloads? It's not clear from
>> the KIP what the semantics are.
>>
>>
>> -Matthias
>>
>> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
>>> Hi,
>>>
>>> Thanks for your comments. I agree with Matthias partially.
>>> I think we should relax some requirements related with to() and through()
>>> methods.
>>> IMHO, Produced class can cover (existing/to be created) topic information,
>>> and which will ease our effort:
>>>
>>> KStream.to(Produced topicInfo)
>>> KStream.through(Produced topicInfo)
>>>
>>> This will decrease the number of overloads but we will need to deprecate
>>> the existing to() and through() methods, perhaps.
>>> I updated the KIP accordingly.
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> @Jan:
>>>>
>>>> The `Produced` class was introduced in 1.0 to specify key and valud
>>>> Serdes (and partitioner) if data is written into a topic.
>>>>
>>>> Old API:
>>>>
>>>> KStream#to("topic", keySerde, valueSerde);
>>>>
>>>> New API:
>>>>
>>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
>>>>
>>>>
>>>> This allows to reduce the number of overloads for `to()` (and
>>>> `through()` that follows the same pattern) -- the second parameter is
>>>> used to cover all different variations of option parameters users can
>>>> specify, while we only have 2 overload for `to()` itself.
>>>>
>>>> What is still unclear to me it, what you mean by this topic prefix
>>>> thing? Either a user cares about the topic name and thus, must create
>>>> and manage it manually. Or the user does not care, and Streams create
>>>> it. How would this prefix idea fit in here?
>>>>
>>>>
>>>>
>>>> @Guozhang:
>>>>
>>>> My idea was to extend `Produced` with the hint we want to give for
>>>> creating internal topic and pass a optional `Produced` parameter. There
>>>> are multiple things we can do here:
>>>>
>>>> 1) stream.through(null, Produced...).groupBy().aggregate()
>>>> -> just allow for `null` topic name indicating that Streams should
>>>> create an internal topic
>>>>
>>>> 2) stream.through(Produced...).groupBy().aggregate()
>>>> -> add one overload taking an mandatory `Produced`
>>>>
>>>> We use `Serialized` to picky back the information
>>>>
>>>> 3) stream.groupBy(Serialized...).aggregate()
>>>> and stream.groupByKey(Serialized...).aggregate()
>>>> -> we don't need new top level overloads
>>>>
>>>>
>>>> There are different trade-offs for those alternatives and maybe there
>>>> are other ways to change the API. It's just to push the discussion further.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
>>>>> Hi Gouzhang,
>>>>>
>>>>> this felt like these questions are supposed to be answered by me.
>>>>> I do not understand the first one. I don't understand why the user
>>>>> shouldn't be able to specify a suffix for the topic name.
>>>>>
>>>>>  For the third question I am not 100% familiar if the Produced class
>>>>> came to existence
>>>>> at all. I remember proposing it somewhere in our redo DSL discussion that
>>>>> I dropped out of later. Finally any call that does:
>>>>>
>>>>> 1. create the internal topic
>>>>> 2. register sink
>>>>> 3. register source
>>>>>
>>>>> will always get the work done. If we have a Produced like class. putting
>>>>> all the parameters
>>>>> in there make sense. (Partitioner, serde, PartitionHint, internal, name
>>>>> ... )
>>>>>
>>>>> Hope this helps?
>>>>>
>>>>>
>>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
>>>>>> A few clarification questions on the proposal details.
>>>>>>
>>>>>> 1. API: although the repartition only happens at the final stateful
>>>>>> operations like agg / join, the repartition flag info was actually
>>>> passed
>>>>>> from an earlier operator like map / groupBy. So what should be the new
>>>>>> API
>>>>>> look like? For example, if we do
>>>>>>
>>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
>>>>>>
>>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
>>>>>>
>>>>>> 2. Semantics: as Matthias mentioned, today any topics defined in
>>>>>> "through()" call is considered a user topic, and hence users are
>>>>>> responsible for managing them, including the topic name. For this KIP's
>>>>>> purpose, though, users would not care about the topic name. I.e. as a
>>>>>> user
>>>>>> I still want to make it be an internal topic so that I do not need to
>>>>>> worry
>>>>>> about it at all, but only specify num.partitions.
>>>>>>
>>>>>> 3. Details: in Produced we do not have specs for specifying the
>>>>>> num.partitions or should we repartition or not. So it is still not
>>>>>> clear to
>>>>>> me how we would make use of that to achieve what's in the old
>>>>>> proposal's RepartitionHint class.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> bq. enlarge the score of through()
>>>>>>>
>>>>>>> I guess you meant scope.
>>>>>>>
>>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov <je...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Sorry for the late reply. I am convinced that we should enlarge the
>>>>>>>> score
>>>>>>>> of through() (add more overloads) instead of introducing a separate
>>>> set
>>>>>>> of
>>>>>>>> overloads to other methods.
>>>>>>>> I will update the KIP soon based on the discussion and inform.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Jeyhun
>>>>>>>>
>>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak <Jan.Filipiak@trivago.com
>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry for not beeing 100% up to date.
>>>>>>>>> Back then we had the discussion that when an operation puts a >Sink<
>>>>>>>>> into the topology, a >Produced<
>>>>>>>>> parameter is added. This produced parameter could have internal or
>>>>>>>>> external. If internal I think the name would still make
>>>>>>>>> a great suffix for the topic name
>>>>>>>>>
>>>>>>>>> Is this plan still around? Otherwise having the name as suffix is
>>>>>>>>> probably always good it can help the user quicker to identify hot
>>>>>>> topics
>>>>>>>>> that need more
>>>>>>>>> partitions if he has many of these internal repartitions
>>>>>>>>>
>>>>>>>>> Best Jan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
>>>>>>>>>> I absolute agree with what you say. It's not a requirement to
>>>>>>> specify a
>>>>>>>>>> topic name -- and this was the idea -- if user does specify a name,
>>>>>>> we
>>>>>>>>>> treat as is -- if users does not specify a name, Streams create an
>>>>>>>>>> internal topic.
>>>>>>>>>>
>>>>>>>>>> The goal of the Jira is to allow a simplified way to control
>>>>>>>>>> repartitioning (atm, user needs to manually create a topic and use
>>>>>>> via
>>>>>>>>>> through()).
>>>>>>>>>>
>>>>>>>>>> Thus, the idea is to make the topic name parameter of through
>>>>>>> optional.
>>>>>>>>>> It's of course just an idea. Happy do have a other API design. The
>>>>>>> goal
>>>>>>>>>> was, to avoid to many new overloads.
>>>>>>>>>>
>>>>>>>>>>>> Could you clarify exactly what you mean by keeping the current
>>>>>>>>> distinction?
>>>>>>>>>> Current distinction is: user topics are created manually and user
>>>>>>>>>> specifies the name -- internal topics are created by Kafka Streams
>>>>>>> and
>>>>>>>>>> an name is generated automatically.
>>>>>>>>>>
>>>>>>>>>> -> through("user-topic")
>>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) // Streams creates
>>>>>>>> an
>>>>>>>>>> internal topic
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
>>>>>>>>>>> Could you clarify exactly what you mean by keeping the current
>>>>>>>>> distinction?
>>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear that being
>>>>>>> able
>>>>>>>>> to specify a custom name is actually a requirement. If the goal is to
>>>>>>>>> control repartitioning and tune parallelism, maybe we can just
>>>>>>>>> sidestep
>>>>>>>>> this issue altogether by removing the ability to set a different
>>>> name.
>>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
>>>>>>>>>>>
>>>>>>>>>>> That's a good point. In current design, we strictly distinguish
>>>>>>> both.
>>>>>>>>>>> For example, the reset tools deletes internal topics (starting with
>>>>>>>>>>> prefix `<application.id>-` and ending with either `-repartition`
>>>> or
>>>>>>>>>>> `-changelog`.
>>>>>>>>>>>
>>>>>>>>>>> Thus, from my point of view, it would make sense to keep the
>>>> current
>>>>>>>>>>> distinction.
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I think this sounds good as well. It's worth clarifying whether
>>>>>>> topics
>>>>>>>>> that are named by the user but created by streams are considered
>>>>>>>> "internal"
>>>>>>>>> topics also.
>>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
>>>>>>>>>>>
>>>>>>>>>>> My idea was, to relax the requirement for through() that a topic
>>>>>>> must
>>>>>>>> be
>>>>>>>>>>> created manually before startup.
>>>>>>>>>>>
>>>>>>>>>>> Thus, if no through() call is made, a (internal) topic is created
>>>>>>> the
>>>>>>>>>>> same way we do it currently.
>>>>>>>>>>>
>>>>>>>>>>> If one uses `through(String topicName)` we keep the current
>>>> behavior
>>>>>>>> and
>>>>>>>>>>> require users to create the topic manually.
>>>>>>>>>>>
>>>>>>>>>>> The reasoning is as follows: if a user creates a topic manually, a
>>>>>>>> user
>>>>>>>>>>> can just use it for repartitioning. As the topic is already there,
>>>>>>>> there
>>>>>>>>>>> is no need to specify any topic configs.
>>>>>>>>>>>
>>>>>>>>>>> We add a new `through()` overload (details TBD) that allows to
>>>>>>> specify
>>>>>>>>>>> topic configs and Streams create the topic with those configs.
>>>>>>>>>>>
>>>>>>>>>>> Reasoning: user don't want to manage topic manually, thus, it's
>>>>>>> still
>>>>>>>> an
>>>>>>>>>>> internal topic and Streams create the topic name automatically as
>>>>>>> for
>>>>>>>>>>> all other internal topics. However, users gets some more control
>>>>>>> about
>>>>>>>>>>> topic parameters like number of partitions (we should discuss what
>>>>>>>> other
>>>>>>>>>>> configs would be useful).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Does this make sense?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like ATM.
>>>>>>>>>>> I just would argue that repartitioning should be an own API call
>>>>>>> like
>>>>>>>>>>> through or something.
>>>>>>>>>>> One can use through or to already to get this. I would argue one
>>>>>>>> should
>>>>>>>>>>> look there instead of overloads
>>>>>>>>>>>
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dear community,
>>>>>>>>>>>
>>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based on issue
>>>>>>> [2].
>>>>>>>>>>> Please feel free to comment.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jeyhun
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ________________________________
>>>>>>>>>>>
>>>>>>>>>>> This email and any attachments may contain confidential and
>>>>>>> privileged
>>>>>>>>> material for the sole use of the intended recipient. Any review,
>>>>>>> copying,
>>>>>>>>> or distribution of this email (or any attachments) by others is
>>>>>>>> prohibited.
>>>>>>>>> If you are not the intended recipient, please contact the sender
>>>>>>>>> immediately and permanently delete this email and any attachments. No
>>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding agreements with
>>>>>>>>> TiVo
>>>>>>>>> Inc. may only be made by a signed written agreement.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ________________________________
>>>>>>>>>>>
>>>>>>>>>>> This email and any attachments may contain confidential and
>>>>>>> privileged
>>>>>>>>> material for the sole use of the intended recipient. Any review,
>>>>>>> copying,
>>>>>>>>> or distribution of this email (or any attachments) by others is
>>>>>>>> prohibited.
>>>>>>>>> If you are not the intended recipient, please contact the sender
>>>>>>>>> immediately and permanently delete this email and any attachments. No
>>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding agreements with
>>>>>>>>> TiVo
>>>>>>>>> Inc. may only be made by a signed written agreement.
>>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

Posted by Jeyhun Karimov <je...@gmail.com>.
Hi Lei,

Please feel free to take over the KIP.

Cheers,
Jeyhun

On Fri, Sep 21, 2018, 22:27 Lei Chen <le...@gmail.com> wrote:

> Hi,
>
> Just want to know is anyone actively working on this and also KAFKA-4835
> <https://issues.apache.org/jira/browse/KAFKA-4835>? Seems like the JIRA
> has been inactive for couple months. We want this feature and would like to
> move it forward if no one else is working on it.
>
>
> Lei
>
> On Wed, Jun 20, 2018 at 7:27 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> No worries. It's just good to know. It seems that some other people are
>> interested to drive this further. So we will just "reassign" it to them.
>>
>> Thanks for letting us know.
>>
>>
>> -Matthias
>>
>> On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
>> > Hi Matthias, all,
>> >
>> > Currently, I am not able to complete this KIP. Please accept my
>> > apologies for that.
>> >
>> >
>> > Cheers,
>> > Jeyhun
>> >
>> > On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax <matthias@confluent.io
>> > <ma...@confluent.io>> wrote:
>> >
>> >     What is the status of this KIP?
>> >
>> >     -Matthias
>> >
>> >
>> >     On 2/13/18 1:43 PM, Matthias J. Sax wrote:
>> >     > Is there any update for this KIP?
>> >     >
>> >     >
>> >     > -Matthias
>> >     >
>> >     > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
>> >     >> Jeyhun,
>> >     >>
>> >     >> thanks for updating the KIP.
>> >     >>
>> >     >> I am wondering if you intend to add a new class `Produced`?
>> There is
>> >     >> already `org.apache.kafka.streams.kstream.Produced`. So if we
>> want to
>> >     >> add a new class, it must have a different name -- or we might be
>> >     able to
>> >     >> merge both into one?
>> >     >>
>> >     >> Also, for the KStream overlaods of `through()` and `to()`, can
>> >     you add
>> >     >> the different behavior using different overloads? It's not clear
>> from
>> >     >> the KIP what the semantics are.
>> >     >>
>> >     >>
>> >     >> -Matthias
>> >     >>
>> >     >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
>> >     >>> Hi,
>> >     >>>
>> >     >>> Thanks for your comments. I agree with Matthias partially.
>> >     >>> I think we should relax some requirements related with to() and
>> >     through()
>> >     >>> methods.
>> >     >>> IMHO, Produced class can cover (existing/to be created) topic
>> >     information,
>> >     >>> and which will ease our effort:
>> >     >>>
>> >     >>> KStream.to(Produced topicInfo)
>> >     >>> KStream.through(Produced topicInfo)
>> >     >>>
>> >     >>> This will decrease the number of overloads but we will need to
>> >     deprecate
>> >     >>> the existing to() and through() methods, perhaps.
>> >     >>> I updated the KIP accordingly.
>> >     >>>
>> >     >>>
>> >     >>> Cheers,
>> >     >>> Jeyhun
>> >     >>>
>> >     >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
>> >     <matthias@confluent.io <ma...@confluent.io>>
>> >     >>> wrote:
>> >     >>>
>> >     >>>> @Jan:
>> >     >>>>
>> >     >>>> The `Produced` class was introduced in 1.0 to specify key and
>> valud
>> >     >>>> Serdes (and partitioner) if data is written into a topic.
>> >     >>>>
>> >     >>>> Old API:
>> >     >>>>
>> >     >>>> KStream#to("topic", keySerde, valueSerde);
>> >     >>>>
>> >     >>>> New API:
>> >     >>>>
>> >     >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
>> >     >>>>
>> >     >>>>
>> >     >>>> This allows to reduce the number of overloads for `to()` (and
>> >     >>>> `through()` that follows the same pattern) -- the second
>> >     parameter is
>> >     >>>> used to cover all different variations of option parameters
>> >     users can
>> >     >>>> specify, while we only have 2 overload for `to()` itself.
>> >     >>>>
>> >     >>>> What is still unclear to me it, what you mean by this topic
>> prefix
>> >     >>>> thing? Either a user cares about the topic name and thus, must
>> >     create
>> >     >>>> and manage it manually. Or the user does not care, and Streams
>> >     create
>> >     >>>> it. How would this prefix idea fit in here?
>> >     >>>>
>> >     >>>>
>> >     >>>>
>> >     >>>> @Guozhang:
>> >     >>>>
>> >     >>>> My idea was to extend `Produced` with the hint we want to give
>> for
>> >     >>>> creating internal topic and pass a optional `Produced`
>> >     parameter. There
>> >     >>>> are multiple things we can do here:
>> >     >>>>
>> >     >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
>> >     >>>> -> just allow for `null` topic name indicating that Streams
>> should
>> >     >>>> create an internal topic
>> >     >>>>
>> >     >>>> 2) stream.through(Produced...).groupBy().aggregate()
>> >     >>>> -> add one overload taking an mandatory `Produced`
>> >     >>>>
>> >     >>>> We use `Serialized` to picky back the information
>> >     >>>>
>> >     >>>> 3) stream.groupBy(Serialized...).aggregate()
>> >     >>>> and stream.groupByKey(Serialized...).aggregate()
>> >     >>>> -> we don't need new top level overloads
>> >     >>>>
>> >     >>>>
>> >     >>>> There are different trade-offs for those alternatives and maybe
>> >     there
>> >     >>>> are other ways to change the API. It's just to push the
>> >     discussion further.
>> >     >>>>
>> >     >>>>
>> >     >>>> -Matthias
>> >     >>>>
>> >     >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
>> >     >>>>> Hi Gouzhang,
>> >     >>>>>
>> >     >>>>> this felt like these questions are supposed to be answered by
>> me.
>> >     >>>>> I do not understand the first one. I don't understand why the
>> user
>> >     >>>>> shouldn't be able to specify a suffix for the topic name.
>> >     >>>>>
>> >     >>>>>  For the third question I am not 100% familiar if the Produced
>> >     class
>> >     >>>>> came to existence
>> >     >>>>> at all. I remember proposing it somewhere in our redo DSL
>> >     discussion that
>> >     >>>>> I dropped out of later. Finally any call that does:
>> >     >>>>>
>> >     >>>>> 1. create the internal topic
>> >     >>>>> 2. register sink
>> >     >>>>> 3. register source
>> >     >>>>>
>> >     >>>>> will always get the work done. If we have a Produced like
>> >     class. putting
>> >     >>>>> all the parameters
>> >     >>>>> in there make sense. (Partitioner, serde, PartitionHint,
>> >     internal, name
>> >     >>>>> ... )
>> >     >>>>>
>> >     >>>>> Hope this helps?
>> >     >>>>>
>> >     >>>>>
>> >     >>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
>> >     >>>>>> A few clarification questions on the proposal details.
>> >     >>>>>>
>> >     >>>>>> 1. API: although the repartition only happens at the final
>> >     stateful
>> >     >>>>>> operations like agg / join, the repartition flag info was
>> >     actually
>> >     >>>> passed
>> >     >>>>>> from an earlier operator like map / groupBy. So what should
>> >     be the new
>> >     >>>>>> API
>> >     >>>>>> look like? For example, if we do
>> >     >>>>>>
>> >     >>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
>> >     >>>>>>
>> >     >>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
>> >     >>>>>>
>> >     >>>>>> 2. Semantics: as Matthias mentioned, today any topics
>> defined in
>> >     >>>>>> "through()" call is considered a user topic, and hence users
>> are
>> >     >>>>>> responsible for managing them, including the topic name. For
>> >     this KIP's
>> >     >>>>>> purpose, though, users would not care about the topic name.
>> >     I.e. as a
>> >     >>>>>> user
>> >     >>>>>> I still want to make it be an internal topic so that I do not
>> >     need to
>> >     >>>>>> worry
>> >     >>>>>> about it at all, but only specify num.partitions.
>> >     >>>>>>
>> >     >>>>>> 3. Details: in Produced we do not have specs for specifying
>> the
>> >     >>>>>> num.partitions or should we repartition or not. So it is
>> >     still not
>> >     >>>>>> clear to
>> >     >>>>>> me how we would make use of that to achieve what's in the old
>> >     >>>>>> proposal's RepartitionHint class.
>> >     >>>>>>
>> >     >>>>>>
>> >     >>>>>>
>> >     >>>>>> Guozhang
>> >     >>>>>>
>> >     >>>>>>
>> >     >>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yuzhihong@gmail.com
>> >     <ma...@gmail.com>> wrote:
>> >     >>>>>>
>> >     >>>>>>> bq. enlarge the score of through()
>> >     >>>>>>>
>> >     >>>>>>> I guess you meant scope.
>> >     >>>>>>>
>> >     >>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov
>> >     <je.karimov@gmail.com <ma...@gmail.com>>
>> >     >>>>>>> wrote:
>> >     >>>>>>>
>> >     >>>>>>>> Hi,
>> >     >>>>>>>>
>> >     >>>>>>>> Sorry for the late reply. I am convinced that we should
>> >     enlarge the
>> >     >>>>>>>> score
>> >     >>>>>>>> of through() (add more overloads) instead of introducing a
>> >     separate
>> >     >>>> set
>> >     >>>>>>> of
>> >     >>>>>>>> overloads to other methods.
>> >     >>>>>>>> I will update the KIP soon based on the discussion and
>> inform.
>> >     >>>>>>>>
>> >     >>>>>>>>
>> >     >>>>>>>> Cheers,
>> >     >>>>>>>> Jeyhun
>> >     >>>>>>>>
>> >     >>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak
>> >     <Jan.Filipiak@trivago.com <ma...@trivago.com>
>> >     >>>>>
>> >     >>>>>>>> wrote:
>> >     >>>>>>>>
>> >     >>>>>>>>> Sorry for not beeing 100% up to date.
>> >     >>>>>>>>> Back then we had the discussion that when an operation
>> >     puts a >Sink<
>> >     >>>>>>>>> into the topology, a >Produced<
>> >     >>>>>>>>> parameter is added. This produced parameter could have
>> >     internal or
>> >     >>>>>>>>> external. If internal I think the name would still make
>> >     >>>>>>>>> a great suffix for the topic name
>> >     >>>>>>>>>
>> >     >>>>>>>>> Is this plan still around? Otherwise having the name as
>> >     suffix is
>> >     >>>>>>>>> probably always good it can help the user quicker to
>> >     identify hot
>> >     >>>>>>> topics
>> >     >>>>>>>>> that need more
>> >     >>>>>>>>> partitions if he has many of these internal repartitions
>> >     >>>>>>>>>
>> >     >>>>>>>>> Best Jan
>> >     >>>>>>>>>
>> >     >>>>>>>>>
>> >     >>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
>> >     >>>>>>>>>> I absolute agree with what you say. It's not a
>> requirement to
>> >     >>>>>>> specify a
>> >     >>>>>>>>>> topic name -- and this was the idea -- if user does
>> >     specify a name,
>> >     >>>>>>> we
>> >     >>>>>>>>>> treat as is -- if users does not specify a name, Streams
>> >     create an
>> >     >>>>>>>>>> internal topic.
>> >     >>>>>>>>>>
>> >     >>>>>>>>>> The goal of the Jira is to allow a simplified way to
>> control
>> >     >>>>>>>>>> repartitioning (atm, user needs to manually create a
>> >     topic and use
>> >     >>>>>>> via
>> >     >>>>>>>>>> through()).
>> >     >>>>>>>>>>
>> >     >>>>>>>>>> Thus, the idea is to make the topic name parameter of
>> through
>> >     >>>>>>> optional.
>> >     >>>>>>>>>> It's of course just an idea. Happy do have a other API
>> >     design. The
>> >     >>>>>>> goal
>> >     >>>>>>>>>> was, to avoid to many new overloads.
>> >     >>>>>>>>>>
>> >     >>>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>> >     current
>> >     >>>>>>>>> distinction?
>> >     >>>>>>>>>> Current distinction is: user topics are created manually
>> >     and user
>> >     >>>>>>>>>> specifies the name -- internal topics are created by
>> >     Kafka Streams
>> >     >>>>>>> and
>> >     >>>>>>>>>> an name is generated automatically.
>> >     >>>>>>>>>>
>> >     >>>>>>>>>> -> through("user-topic")
>> >     >>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) //
>> >     Streams creates
>> >     >>>>>>>> an
>> >     >>>>>>>>>> internal topic
>> >     >>>>>>>>>>
>> >     >>>>>>>>>>
>> >     >>>>>>>>>> -Matthias
>> >     >>>>>>>>>>
>> >     >>>>>>>>>>
>> >     >>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
>> >     >>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>> >     current
>> >     >>>>>>>>> distinction?
>> >     >>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear
>> >     that being
>> >     >>>>>>> able
>> >     >>>>>>>>> to specify a custom name is actually a requirement. If the
>> >     goal is to
>> >     >>>>>>>>> control repartitioning and tune parallelism, maybe we can
>> just
>> >     >>>>>>>>> sidestep
>> >     >>>>>>>>> this issue altogether by removing the ability to set a
>> >     different
>> >     >>>> name.
>> >     >>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax
>> wrote:
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> That's a good point. In current design, we strictly
>> >     distinguish
>> >     >>>>>>> both.
>> >     >>>>>>>>>>> For example, the reset tools deletes internal topics
>> >     (starting with
>> >     >>>>>>>>>>> prefix `<application.id <http://application.id>>-` and
>> >     ending with either `-repartition`
>> >     >>>> or
>> >     >>>>>>>>>>> `-changelog`.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Thus, from my point of view, it would make sense to
>> keep the
>> >     >>>> current
>> >     >>>>>>>>>>> distinction.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> -Matthias
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> I think this sounds good as well. It's worth clarifying
>> >     whether
>> >     >>>>>>> topics
>> >     >>>>>>>>> that are named by the user but created by streams are
>> >     considered
>> >     >>>>>>>> "internal"
>> >     >>>>>>>>> topics also.
>> >     >>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax
>> wrote:
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> My idea was, to relax the requirement for through() that
>> >     a topic
>> >     >>>>>>> must
>> >     >>>>>>>> be
>> >     >>>>>>>>>>> created manually before startup.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Thus, if no through() call is made, a (internal) topic
>> >     is created
>> >     >>>>>>> the
>> >     >>>>>>>>>>> same way we do it currently.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> If one uses `through(String topicName)` we keep the
>> current
>> >     >>>> behavior
>> >     >>>>>>>> and
>> >     >>>>>>>>>>> require users to create the topic manually.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> The reasoning is as follows: if a user creates a topic
>> >     manually, a
>> >     >>>>>>>> user
>> >     >>>>>>>>>>> can just use it for repartitioning. As the topic is
>> >     already there,
>> >     >>>>>>>> there
>> >     >>>>>>>>>>> is no need to specify any topic configs.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> We add a new `through()` overload (details TBD) that
>> >     allows to
>> >     >>>>>>> specify
>> >     >>>>>>>>>>> topic configs and Streams create the topic with those
>> >     configs.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Reasoning: user don't want to manage topic manually,
>> >     thus, it's
>> >     >>>>>>> still
>> >     >>>>>>>> an
>> >     >>>>>>>>>>> internal topic and Streams create the topic name
>> >     automatically as
>> >     >>>>>>> for
>> >     >>>>>>>>>>> all other internal topics. However, users gets some more
>> >     control
>> >     >>>>>>> about
>> >     >>>>>>>>>>> topic parameters like number of partitions (we should
>> >     discuss what
>> >     >>>>>>>> other
>> >     >>>>>>>>>>> configs would be useful).
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Does this make sense?
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> -Matthias
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Hi.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like
>> ATM.
>> >     >>>>>>>>>>> I just would argue that repartitioning should be an own
>> >     API call
>> >     >>>>>>> like
>> >     >>>>>>>>>>> through or something.
>> >     >>>>>>>>>>> One can use through or to already to get this. I would
>> >     argue one
>> >     >>>>>>>> should
>> >     >>>>>>>>>>> look there instead of overloads
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Best Jan
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Dear community,
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based
>> >     on issue
>> >     >>>>>>> [2].
>> >     >>>>>>>>>>> Please feel free to comment.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> [1]
>> >     >>>>>>>>>>>
>> >     >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >     >>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
>> >     >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> Cheers,
>> >     >>>>>>>>>>> Jeyhun
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> ________________________________
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> This email and any attachments may contain confidential
>> and
>> >     >>>>>>> privileged
>> >     >>>>>>>>> material for the sole use of the intended recipient. Any
>> >     review,
>> >     >>>>>>> copying,
>> >     >>>>>>>>> or distribution of this email (or any attachments) by
>> >     others is
>> >     >>>>>>>> prohibited.
>> >     >>>>>>>>> If you are not the intended recipient, please contact the
>> >     sender
>> >     >>>>>>>>> immediately and permanently delete this email and any
>> >     attachments. No
>> >     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>> >     any binding
>> >     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>> >     agreements with
>> >     >>>>>>>>> TiVo
>> >     >>>>>>>>> Inc. may only be made by a signed written agreement.
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> ________________________________
>> >     >>>>>>>>>>>
>> >     >>>>>>>>>>> This email and any attachments may contain confidential
>> and
>> >     >>>>>>> privileged
>> >     >>>>>>>>> material for the sole use of the intended recipient. Any
>> >     review,
>> >     >>>>>>> copying,
>> >     >>>>>>>>> or distribution of this email (or any attachments) by
>> >     others is
>> >     >>>>>>>> prohibited.
>> >     >>>>>>>>> If you are not the intended recipient, please contact the
>> >     sender
>> >     >>>>>>>>> immediately and permanently delete this email and any
>> >     attachments. No
>> >     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>> >     any binding
>> >     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>> >     agreements with
>> >     >>>>>>>>> TiVo
>> >     >>>>>>>>> Inc. may only be made by a signed written agreement.
>> >     >>>>>>>>>
>> >     >>>>>>
>> >     >>>>>>
>> >     >>>>>
>> >     >>>>
>> >     >>>>
>> >     >>>
>> >     >>
>> >     >
>> >
>>
>>

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It seems Jeyhun (cc'ed) is not working on the KIP any longer. If there
is no response within a week from Jeyhun, feel free to take over the KIP.

One more side comment: we recently accepted KIP-372, that overlaps with
this KIP. Thus, if you resume KIP-221, please consider the changes of
KIP-372.


Thanks a lot!


-Matthias

On 9/21/18 11:27 AM, Lei Chen wrote:
> Hi,
> 
> Just want to know is anyone actively working on this and also KAFKA-4835
> <https://issues.apache.org/jira/browse/KAFKA-4835>? Seems like the JIRA has
> been inactive for couple months. We want this feature and would like to
> move it forward if no one else is working on it.
> 
> Lei
> 
> On Wed, Jun 20, 2018 at 7:27 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> No worries. It's just good to know. It seems that some other people are
>> interested to drive this further. So we will just "reassign" it to them.
>>
>> Thanks for letting us know.
>>
>>
>> -Matthias
>>
>> On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
>>> Hi Matthias, all,
>>>
>>> Currently, I am not able to complete this KIP. Please accept my
>>> apologies for that.
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>> On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax <matthias@confluent.io
>>> <ma...@confluent.io>> wrote:
>>>
>>>     What is the status of this KIP?
>>>
>>>     -Matthias
>>>
>>>
>>>     On 2/13/18 1:43 PM, Matthias J. Sax wrote:
>>>     > Is there any update for this KIP?
>>>     >
>>>     >
>>>     > -Matthias
>>>     >
>>>     > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
>>>     >> Jeyhun,
>>>     >>
>>>     >> thanks for updating the KIP.
>>>     >>
>>>     >> I am wondering if you intend to add a new class `Produced`? There
>> is
>>>     >> already `org.apache.kafka.streams.kstream.Produced`. So if we
>> want to
>>>     >> add a new class, it must have a different name -- or we might be
>>>     able to
>>>     >> merge both into one?
>>>     >>
>>>     >> Also, for the KStream overlaods of `through()` and `to()`, can
>>>     you add
>>>     >> the different behavior using different overloads? It's not clear
>> from
>>>     >> the KIP what the semantics are.
>>>     >>
>>>     >>
>>>     >> -Matthias
>>>     >>
>>>     >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
>>>     >>> Hi,
>>>     >>>
>>>     >>> Thanks for your comments. I agree with Matthias partially.
>>>     >>> I think we should relax some requirements related with to() and
>>>     through()
>>>     >>> methods.
>>>     >>> IMHO, Produced class can cover (existing/to be created) topic
>>>     information,
>>>     >>> and which will ease our effort:
>>>     >>>
>>>     >>> KStream.to(Produced topicInfo)
>>>     >>> KStream.through(Produced topicInfo)
>>>     >>>
>>>     >>> This will decrease the number of overloads but we will need to
>>>     deprecate
>>>     >>> the existing to() and through() methods, perhaps.
>>>     >>> I updated the KIP accordingly.
>>>     >>>
>>>     >>>
>>>     >>> Cheers,
>>>     >>> Jeyhun
>>>     >>>
>>>     >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
>>>     <matthias@confluent.io <ma...@confluent.io>>
>>>     >>> wrote:
>>>     >>>
>>>     >>>> @Jan:
>>>     >>>>
>>>     >>>> The `Produced` class was introduced in 1.0 to specify key and
>> valud
>>>     >>>> Serdes (and partitioner) if data is written into a topic.
>>>     >>>>
>>>     >>>> Old API:
>>>     >>>>
>>>     >>>> KStream#to("topic", keySerde, valueSerde);
>>>     >>>>
>>>     >>>> New API:
>>>     >>>>
>>>     >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
>>>     >>>>
>>>     >>>>
>>>     >>>> This allows to reduce the number of overloads for `to()` (and
>>>     >>>> `through()` that follows the same pattern) -- the second
>>>     parameter is
>>>     >>>> used to cover all different variations of option parameters
>>>     users can
>>>     >>>> specify, while we only have 2 overload for `to()` itself.
>>>     >>>>
>>>     >>>> What is still unclear to me it, what you mean by this topic
>> prefix
>>>     >>>> thing? Either a user cares about the topic name and thus, must
>>>     create
>>>     >>>> and manage it manually. Or the user does not care, and Streams
>>>     create
>>>     >>>> it. How would this prefix idea fit in here?
>>>     >>>>
>>>     >>>>
>>>     >>>>
>>>     >>>> @Guozhang:
>>>     >>>>
>>>     >>>> My idea was to extend `Produced` with the hint we want to give
>> for
>>>     >>>> creating internal topic and pass a optional `Produced`
>>>     parameter. There
>>>     >>>> are multiple things we can do here:
>>>     >>>>
>>>     >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
>>>     >>>> -> just allow for `null` topic name indicating that Streams
>> should
>>>     >>>> create an internal topic
>>>     >>>>
>>>     >>>> 2) stream.through(Produced...).groupBy().aggregate()
>>>     >>>> -> add one overload taking an mandatory `Produced`
>>>     >>>>
>>>     >>>> We use `Serialized` to picky back the information
>>>     >>>>
>>>     >>>> 3) stream.groupBy(Serialized...).aggregate()
>>>     >>>> and stream.groupByKey(Serialized...).aggregate()
>>>     >>>> -> we don't need new top level overloads
>>>     >>>>
>>>     >>>>
>>>     >>>> There are different trade-offs for those alternatives and maybe
>>>     there
>>>     >>>> are other ways to change the API. It's just to push the
>>>     discussion further.
>>>     >>>>
>>>     >>>>
>>>     >>>> -Matthias
>>>     >>>>
>>>     >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
>>>     >>>>> Hi Gouzhang,
>>>     >>>>>
>>>     >>>>> this felt like these questions are supposed to be answered by
>> me.
>>>     >>>>> I do not understand the first one. I don't understand why the
>> user
>>>     >>>>> shouldn't be able to specify a suffix for the topic name.
>>>     >>>>>
>>>     >>>>>  For the third question I am not 100% familiar if the Produced
>>>     class
>>>     >>>>> came to existence
>>>     >>>>> at all. I remember proposing it somewhere in our redo DSL
>>>     discussion that
>>>     >>>>> I dropped out of later. Finally any call that does:
>>>     >>>>>
>>>     >>>>> 1. create the internal topic
>>>     >>>>> 2. register sink
>>>     >>>>> 3. register source
>>>     >>>>>
>>>     >>>>> will always get the work done. If we have a Produced like
>>>     class. putting
>>>     >>>>> all the parameters
>>>     >>>>> in there make sense. (Partitioner, serde, PartitionHint,
>>>     internal, name
>>>     >>>>> ... )
>>>     >>>>>
>>>     >>>>> Hope this helps?
>>>     >>>>>
>>>     >>>>>
>>>     >>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
>>>     >>>>>> A few clarification questions on the proposal details.
>>>     >>>>>>
>>>     >>>>>> 1. API: although the repartition only happens at the final
>>>     stateful
>>>     >>>>>> operations like agg / join, the repartition flag info was
>>>     actually
>>>     >>>> passed
>>>     >>>>>> from an earlier operator like map / groupBy. So what should
>>>     be the new
>>>     >>>>>> API
>>>     >>>>>> look like? For example, if we do
>>>     >>>>>>
>>>     >>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
>>>     >>>>>>
>>>     >>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
>>>     >>>>>>
>>>     >>>>>> 2. Semantics: as Matthias mentioned, today any topics defined
>> in
>>>     >>>>>> "through()" call is considered a user topic, and hence users
>> are
>>>     >>>>>> responsible for managing them, including the topic name. For
>>>     this KIP's
>>>     >>>>>> purpose, though, users would not care about the topic name.
>>>     I.e. as a
>>>     >>>>>> user
>>>     >>>>>> I still want to make it be an internal topic so that I do not
>>>     need to
>>>     >>>>>> worry
>>>     >>>>>> about it at all, but only specify num.partitions.
>>>     >>>>>>
>>>     >>>>>> 3. Details: in Produced we do not have specs for specifying
>> the
>>>     >>>>>> num.partitions or should we repartition or not. So it is
>>>     still not
>>>     >>>>>> clear to
>>>     >>>>>> me how we would make use of that to achieve what's in the old
>>>     >>>>>> proposal's RepartitionHint class.
>>>     >>>>>>
>>>     >>>>>>
>>>     >>>>>>
>>>     >>>>>> Guozhang
>>>     >>>>>>
>>>     >>>>>>
>>>     >>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yuzhihong@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>     >>>>>>
>>>     >>>>>>> bq. enlarge the score of through()
>>>     >>>>>>>
>>>     >>>>>>> I guess you meant scope.
>>>     >>>>>>>
>>>     >>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov
>>>     <je.karimov@gmail.com <ma...@gmail.com>>
>>>     >>>>>>> wrote:
>>>     >>>>>>>
>>>     >>>>>>>> Hi,
>>>     >>>>>>>>
>>>     >>>>>>>> Sorry for the late reply. I am convinced that we should
>>>     enlarge the
>>>     >>>>>>>> score
>>>     >>>>>>>> of through() (add more overloads) instead of introducing a
>>>     separate
>>>     >>>> set
>>>     >>>>>>> of
>>>     >>>>>>>> overloads to other methods.
>>>     >>>>>>>> I will update the KIP soon based on the discussion and
>> inform.
>>>     >>>>>>>>
>>>     >>>>>>>>
>>>     >>>>>>>> Cheers,
>>>     >>>>>>>> Jeyhun
>>>     >>>>>>>>
>>>     >>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak
>>>     <Jan.Filipiak@trivago.com <ma...@trivago.com>
>>>     >>>>>
>>>     >>>>>>>> wrote:
>>>     >>>>>>>>
>>>     >>>>>>>>> Sorry for not beeing 100% up to date.
>>>     >>>>>>>>> Back then we had the discussion that when an operation
>>>     puts a >Sink<
>>>     >>>>>>>>> into the topology, a >Produced<
>>>     >>>>>>>>> parameter is added. This produced parameter could have
>>>     internal or
>>>     >>>>>>>>> external. If internal I think the name would still make
>>>     >>>>>>>>> a great suffix for the topic name
>>>     >>>>>>>>>
>>>     >>>>>>>>> Is this plan still around? Otherwise having the name as
>>>     suffix is
>>>     >>>>>>>>> probably always good it can help the user quicker to
>>>     identify hot
>>>     >>>>>>> topics
>>>     >>>>>>>>> that need more
>>>     >>>>>>>>> partitions if he has many of these internal repartitions
>>>     >>>>>>>>>
>>>     >>>>>>>>> Best Jan
>>>     >>>>>>>>>
>>>     >>>>>>>>>
>>>     >>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
>>>     >>>>>>>>>> I absolute agree with what you say. It's not a
>> requirement to
>>>     >>>>>>> specify a
>>>     >>>>>>>>>> topic name -- and this was the idea -- if user does
>>>     specify a name,
>>>     >>>>>>> we
>>>     >>>>>>>>>> treat as is -- if users does not specify a name, Streams
>>>     create an
>>>     >>>>>>>>>> internal topic.
>>>     >>>>>>>>>>
>>>     >>>>>>>>>> The goal of the Jira is to allow a simplified way to
>> control
>>>     >>>>>>>>>> repartitioning (atm, user needs to manually create a
>>>     topic and use
>>>     >>>>>>> via
>>>     >>>>>>>>>> through()).
>>>     >>>>>>>>>>
>>>     >>>>>>>>>> Thus, the idea is to make the topic name parameter of
>> through
>>>     >>>>>>> optional.
>>>     >>>>>>>>>> It's of course just an idea. Happy do have a other API
>>>     design. The
>>>     >>>>>>> goal
>>>     >>>>>>>>>> was, to avoid to many new overloads.
>>>     >>>>>>>>>>
>>>     >>>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>>>     current
>>>     >>>>>>>>> distinction?
>>>     >>>>>>>>>> Current distinction is: user topics are created manually
>>>     and user
>>>     >>>>>>>>>> specifies the name -- internal topics are created by
>>>     Kafka Streams
>>>     >>>>>>> and
>>>     >>>>>>>>>> an name is generated automatically.
>>>     >>>>>>>>>>
>>>     >>>>>>>>>> -> through("user-topic")
>>>     >>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) //
>>>     Streams creates
>>>     >>>>>>>> an
>>>     >>>>>>>>>> internal topic
>>>     >>>>>>>>>>
>>>     >>>>>>>>>>
>>>     >>>>>>>>>> -Matthias
>>>     >>>>>>>>>>
>>>     >>>>>>>>>>
>>>     >>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
>>>     >>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>>>     current
>>>     >>>>>>>>> distinction?
>>>     >>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear
>>>     that being
>>>     >>>>>>> able
>>>     >>>>>>>>> to specify a custom name is actually a requirement. If the
>>>     goal is to
>>>     >>>>>>>>> control repartitioning and tune parallelism, maybe we can
>> just
>>>     >>>>>>>>> sidestep
>>>     >>>>>>>>> this issue altogether by removing the ability to set a
>>>     different
>>>     >>>> name.
>>>     >>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> That's a good point. In current design, we strictly
>>>     distinguish
>>>     >>>>>>> both.
>>>     >>>>>>>>>>> For example, the reset tools deletes internal topics
>>>     (starting with
>>>     >>>>>>>>>>> prefix `<application.id <http://application.id>>-` and
>>>     ending with either `-repartition`
>>>     >>>> or
>>>     >>>>>>>>>>> `-changelog`.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Thus, from my point of view, it would make sense to keep
>> the
>>>     >>>> current
>>>     >>>>>>>>>>> distinction.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> -Matthias
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> I think this sounds good as well. It's worth clarifying
>>>     whether
>>>     >>>>>>> topics
>>>     >>>>>>>>> that are named by the user but created by streams are
>>>     considered
>>>     >>>>>>>> "internal"
>>>     >>>>>>>>> topics also.
>>>     >>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> My idea was, to relax the requirement for through() that
>>>     a topic
>>>     >>>>>>> must
>>>     >>>>>>>> be
>>>     >>>>>>>>>>> created manually before startup.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Thus, if no through() call is made, a (internal) topic
>>>     is created
>>>     >>>>>>> the
>>>     >>>>>>>>>>> same way we do it currently.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> If one uses `through(String topicName)` we keep the
>> current
>>>     >>>> behavior
>>>     >>>>>>>> and
>>>     >>>>>>>>>>> require users to create the topic manually.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> The reasoning is as follows: if a user creates a topic
>>>     manually, a
>>>     >>>>>>>> user
>>>     >>>>>>>>>>> can just use it for repartitioning. As the topic is
>>>     already there,
>>>     >>>>>>>> there
>>>     >>>>>>>>>>> is no need to specify any topic configs.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> We add a new `through()` overload (details TBD) that
>>>     allows to
>>>     >>>>>>> specify
>>>     >>>>>>>>>>> topic configs and Streams create the topic with those
>>>     configs.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Reasoning: user don't want to manage topic manually,
>>>     thus, it's
>>>     >>>>>>> still
>>>     >>>>>>>> an
>>>     >>>>>>>>>>> internal topic and Streams create the topic name
>>>     automatically as
>>>     >>>>>>> for
>>>     >>>>>>>>>>> all other internal topics. However, users gets some more
>>>     control
>>>     >>>>>>> about
>>>     >>>>>>>>>>> topic parameters like number of partitions (we should
>>>     discuss what
>>>     >>>>>>>> other
>>>     >>>>>>>>>>> configs would be useful).
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Does this make sense?
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> -Matthias
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Hi.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like
>> ATM.
>>>     >>>>>>>>>>> I just would argue that repartitioning should be an own
>>>     API call
>>>     >>>>>>> like
>>>     >>>>>>>>>>> through or something.
>>>     >>>>>>>>>>> One can use through or to already to get this. I would
>>>     argue one
>>>     >>>>>>>> should
>>>     >>>>>>>>>>> look there instead of overloads
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Best Jan
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Dear community,
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based
>>>     on issue
>>>     >>>>>>> [2].
>>>     >>>>>>>>>>> Please feel free to comment.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> [1]
>>>     >>>>>>>>>>>
>>>     >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>     >>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
>>>     >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> Cheers,
>>>     >>>>>>>>>>> Jeyhun
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> ________________________________
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> This email and any attachments may contain confidential
>> and
>>>     >>>>>>> privileged
>>>     >>>>>>>>> material for the sole use of the intended recipient. Any
>>>     review,
>>>     >>>>>>> copying,
>>>     >>>>>>>>> or distribution of this email (or any attachments) by
>>>     others is
>>>     >>>>>>>> prohibited.
>>>     >>>>>>>>> If you are not the intended recipient, please contact the
>>>     sender
>>>     >>>>>>>>> immediately and permanently delete this email and any
>>>     attachments. No
>>>     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>>>     any binding
>>>     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>>>     agreements with
>>>     >>>>>>>>> TiVo
>>>     >>>>>>>>> Inc. may only be made by a signed written agreement.
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> ________________________________
>>>     >>>>>>>>>>>
>>>     >>>>>>>>>>> This email and any attachments may contain confidential
>> and
>>>     >>>>>>> privileged
>>>     >>>>>>>>> material for the sole use of the intended recipient. Any
>>>     review,
>>>     >>>>>>> copying,
>>>     >>>>>>>>> or distribution of this email (or any attachments) by
>>>     others is
>>>     >>>>>>>> prohibited.
>>>     >>>>>>>>> If you are not the intended recipient, please contact the
>>>     sender
>>>     >>>>>>>>> immediately and permanently delete this email and any
>>>     attachments. No
>>>     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>>>     any binding
>>>     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>>>     agreements with
>>>     >>>>>>>>> TiVo
>>>     >>>>>>>>> Inc. may only be made by a signed written agreement.
>>>     >>>>>>>>>
>>>     >>>>>>
>>>     >>>>>>
>>>     >>>>>
>>>     >>>>
>>>     >>>>
>>>     >>>
>>>     >>
>>>     >
>>>
>>
>>
> 


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

Posted by Lei Chen <le...@gmail.com>.
Hi,

Just want to know is anyone actively working on this and also KAFKA-4835
<https://issues.apache.org/jira/browse/KAFKA-4835>? Seems like the JIRA has
been inactive for couple months. We want this feature and would like to
move it forward if no one else is working on it.

Lei

On Wed, Jun 20, 2018 at 7:27 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> No worries. It's just good to know. It seems that some other people are
> interested to drive this further. So we will just "reassign" it to them.
>
> Thanks for letting us know.
>
>
> -Matthias
>
> On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
> > Hi Matthias, all,
> >
> > Currently, I am not able to complete this KIP. Please accept my
> > apologies for that.
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax <matthias@confluent.io
> > <ma...@confluent.io>> wrote:
> >
> >     What is the status of this KIP?
> >
> >     -Matthias
> >
> >
> >     On 2/13/18 1:43 PM, Matthias J. Sax wrote:
> >     > Is there any update for this KIP?
> >     >
> >     >
> >     > -Matthias
> >     >
> >     > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
> >     >> Jeyhun,
> >     >>
> >     >> thanks for updating the KIP.
> >     >>
> >     >> I am wondering if you intend to add a new class `Produced`? There
> is
> >     >> already `org.apache.kafka.streams.kstream.Produced`. So if we
> want to
> >     >> add a new class, it must have a different name -- or we might be
> >     able to
> >     >> merge both into one?
> >     >>
> >     >> Also, for the KStream overlaods of `through()` and `to()`, can
> >     you add
> >     >> the different behavior using different overloads? It's not clear
> from
> >     >> the KIP what the semantics are.
> >     >>
> >     >>
> >     >> -Matthias
> >     >>
> >     >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
> >     >>> Hi,
> >     >>>
> >     >>> Thanks for your comments. I agree with Matthias partially.
> >     >>> I think we should relax some requirements related with to() and
> >     through()
> >     >>> methods.
> >     >>> IMHO, Produced class can cover (existing/to be created) topic
> >     information,
> >     >>> and which will ease our effort:
> >     >>>
> >     >>> KStream.to(Produced topicInfo)
> >     >>> KStream.through(Produced topicInfo)
> >     >>>
> >     >>> This will decrease the number of overloads but we will need to
> >     deprecate
> >     >>> the existing to() and through() methods, perhaps.
> >     >>> I updated the KIP accordingly.
> >     >>>
> >     >>>
> >     >>> Cheers,
> >     >>> Jeyhun
> >     >>>
> >     >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
> >     <matthias@confluent.io <ma...@confluent.io>>
> >     >>> wrote:
> >     >>>
> >     >>>> @Jan:
> >     >>>>
> >     >>>> The `Produced` class was introduced in 1.0 to specify key and
> valud
> >     >>>> Serdes (and partitioner) if data is written into a topic.
> >     >>>>
> >     >>>> Old API:
> >     >>>>
> >     >>>> KStream#to("topic", keySerde, valueSerde);
> >     >>>>
> >     >>>> New API:
> >     >>>>
> >     >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
> >     >>>>
> >     >>>>
> >     >>>> This allows to reduce the number of overloads for `to()` (and
> >     >>>> `through()` that follows the same pattern) -- the second
> >     parameter is
> >     >>>> used to cover all different variations of option parameters
> >     users can
> >     >>>> specify, while we only have 2 overload for `to()` itself.
> >     >>>>
> >     >>>> What is still unclear to me it, what you mean by this topic
> prefix
> >     >>>> thing? Either a user cares about the topic name and thus, must
> >     create
> >     >>>> and manage it manually. Or the user does not care, and Streams
> >     create
> >     >>>> it. How would this prefix idea fit in here?
> >     >>>>
> >     >>>>
> >     >>>>
> >     >>>> @Guozhang:
> >     >>>>
> >     >>>> My idea was to extend `Produced` with the hint we want to give
> for
> >     >>>> creating internal topic and pass a optional `Produced`
> >     parameter. There
> >     >>>> are multiple things we can do here:
> >     >>>>
> >     >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
> >     >>>> -> just allow for `null` topic name indicating that Streams
> should
> >     >>>> create an internal topic
> >     >>>>
> >     >>>> 2) stream.through(Produced...).groupBy().aggregate()
> >     >>>> -> add one overload taking an mandatory `Produced`
> >     >>>>
> >     >>>> We use `Serialized` to picky back the information
> >     >>>>
> >     >>>> 3) stream.groupBy(Serialized...).aggregate()
> >     >>>> and stream.groupByKey(Serialized...).aggregate()
> >     >>>> -> we don't need new top level overloads
> >     >>>>
> >     >>>>
> >     >>>> There are different trade-offs for those alternatives and maybe
> >     there
> >     >>>> are other ways to change the API. It's just to push the
> >     discussion further.
> >     >>>>
> >     >>>>
> >     >>>> -Matthias
> >     >>>>
> >     >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
> >     >>>>> Hi Gouzhang,
> >     >>>>>
> >     >>>>> this felt like these questions are supposed to be answered by
> me.
> >     >>>>> I do not understand the first one. I don't understand why the
> user
> >     >>>>> shouldn't be able to specify a suffix for the topic name.
> >     >>>>>
> >     >>>>>  For the third question I am not 100% familiar if the Produced
> >     class
> >     >>>>> came to existence
> >     >>>>> at all. I remember proposing it somewhere in our redo DSL
> >     discussion that
> >     >>>>> I dropped out of later. Finally any call that does:
> >     >>>>>
> >     >>>>> 1. create the internal topic
> >     >>>>> 2. register sink
> >     >>>>> 3. register source
> >     >>>>>
> >     >>>>> will always get the work done. If we have a Produced like
> >     class. putting
> >     >>>>> all the parameters
> >     >>>>> in there make sense. (Partitioner, serde, PartitionHint,
> >     internal, name
> >     >>>>> ... )
> >     >>>>>
> >     >>>>> Hope this helps?
> >     >>>>>
> >     >>>>>
> >     >>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
> >     >>>>>> A few clarification questions on the proposal details.
> >     >>>>>>
> >     >>>>>> 1. API: although the repartition only happens at the final
> >     stateful
> >     >>>>>> operations like agg / join, the repartition flag info was
> >     actually
> >     >>>> passed
> >     >>>>>> from an earlier operator like map / groupBy. So what should
> >     be the new
> >     >>>>>> API
> >     >>>>>> look like? For example, if we do
> >     >>>>>>
> >     >>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
> >     >>>>>>
> >     >>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
> >     >>>>>>
> >     >>>>>> 2. Semantics: as Matthias mentioned, today any topics defined
> in
> >     >>>>>> "through()" call is considered a user topic, and hence users
> are
> >     >>>>>> responsible for managing them, including the topic name. For
> >     this KIP's
> >     >>>>>> purpose, though, users would not care about the topic name.
> >     I.e. as a
> >     >>>>>> user
> >     >>>>>> I still want to make it be an internal topic so that I do not
> >     need to
> >     >>>>>> worry
> >     >>>>>> about it at all, but only specify num.partitions.
> >     >>>>>>
> >     >>>>>> 3. Details: in Produced we do not have specs for specifying
> the
> >     >>>>>> num.partitions or should we repartition or not. So it is
> >     still not
> >     >>>>>> clear to
> >     >>>>>> me how we would make use of that to achieve what's in the old
> >     >>>>>> proposal's RepartitionHint class.
> >     >>>>>>
> >     >>>>>>
> >     >>>>>>
> >     >>>>>> Guozhang
> >     >>>>>>
> >     >>>>>>
> >     >>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yuzhihong@gmail.com
> >     <ma...@gmail.com>> wrote:
> >     >>>>>>
> >     >>>>>>> bq. enlarge the score of through()
> >     >>>>>>>
> >     >>>>>>> I guess you meant scope.
> >     >>>>>>>
> >     >>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov
> >     <je.karimov@gmail.com <ma...@gmail.com>>
> >     >>>>>>> wrote:
> >     >>>>>>>
> >     >>>>>>>> Hi,
> >     >>>>>>>>
> >     >>>>>>>> Sorry for the late reply. I am convinced that we should
> >     enlarge the
> >     >>>>>>>> score
> >     >>>>>>>> of through() (add more overloads) instead of introducing a
> >     separate
> >     >>>> set
> >     >>>>>>> of
> >     >>>>>>>> overloads to other methods.
> >     >>>>>>>> I will update the KIP soon based on the discussion and
> inform.
> >     >>>>>>>>
> >     >>>>>>>>
> >     >>>>>>>> Cheers,
> >     >>>>>>>> Jeyhun
> >     >>>>>>>>
> >     >>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak
> >     <Jan.Filipiak@trivago.com <ma...@trivago.com>
> >     >>>>>
> >     >>>>>>>> wrote:
> >     >>>>>>>>
> >     >>>>>>>>> Sorry for not beeing 100% up to date.
> >     >>>>>>>>> Back then we had the discussion that when an operation
> >     puts a >Sink<
> >     >>>>>>>>> into the topology, a >Produced<
> >     >>>>>>>>> parameter is added. This produced parameter could have
> >     internal or
> >     >>>>>>>>> external. If internal I think the name would still make
> >     >>>>>>>>> a great suffix for the topic name
> >     >>>>>>>>>
> >     >>>>>>>>> Is this plan still around? Otherwise having the name as
> >     suffix is
> >     >>>>>>>>> probably always good it can help the user quicker to
> >     identify hot
> >     >>>>>>> topics
> >     >>>>>>>>> that need more
> >     >>>>>>>>> partitions if he has many of these internal repartitions
> >     >>>>>>>>>
> >     >>>>>>>>> Best Jan
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
> >     >>>>>>>>>> I absolute agree with what you say. It's not a
> requirement to
> >     >>>>>>> specify a
> >     >>>>>>>>>> topic name -- and this was the idea -- if user does
> >     specify a name,
> >     >>>>>>> we
> >     >>>>>>>>>> treat as is -- if users does not specify a name, Streams
> >     create an
> >     >>>>>>>>>> internal topic.
> >     >>>>>>>>>>
> >     >>>>>>>>>> The goal of the Jira is to allow a simplified way to
> control
> >     >>>>>>>>>> repartitioning (atm, user needs to manually create a
> >     topic and use
> >     >>>>>>> via
> >     >>>>>>>>>> through()).
> >     >>>>>>>>>>
> >     >>>>>>>>>> Thus, the idea is to make the topic name parameter of
> through
> >     >>>>>>> optional.
> >     >>>>>>>>>> It's of course just an idea. Happy do have a other API
> >     design. The
> >     >>>>>>> goal
> >     >>>>>>>>>> was, to avoid to many new overloads.
> >     >>>>>>>>>>
> >     >>>>>>>>>>>> Could you clarify exactly what you mean by keeping the
> >     current
> >     >>>>>>>>> distinction?
> >     >>>>>>>>>> Current distinction is: user topics are created manually
> >     and user
> >     >>>>>>>>>> specifies the name -- internal topics are created by
> >     Kafka Streams
> >     >>>>>>> and
> >     >>>>>>>>>> an name is generated automatically.
> >     >>>>>>>>>>
> >     >>>>>>>>>> -> through("user-topic")
> >     >>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) //
> >     Streams creates
> >     >>>>>>>> an
> >     >>>>>>>>>> internal topic
> >     >>>>>>>>>>
> >     >>>>>>>>>>
> >     >>>>>>>>>> -Matthias
> >     >>>>>>>>>>
> >     >>>>>>>>>>
> >     >>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
> >     >>>>>>>>>>> Could you clarify exactly what you mean by keeping the
> >     current
> >     >>>>>>>>> distinction?
> >     >>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear
> >     that being
> >     >>>>>>> able
> >     >>>>>>>>> to specify a custom name is actually a requirement. If the
> >     goal is to
> >     >>>>>>>>> control repartitioning and tune parallelism, maybe we can
> just
> >     >>>>>>>>> sidestep
> >     >>>>>>>>> this issue altogether by removing the ability to set a
> >     different
> >     >>>> name.
> >     >>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> That's a good point. In current design, we strictly
> >     distinguish
> >     >>>>>>> both.
> >     >>>>>>>>>>> For example, the reset tools deletes internal topics
> >     (starting with
> >     >>>>>>>>>>> prefix `<application.id <http://application.id>>-` and
> >     ending with either `-repartition`
> >     >>>> or
> >     >>>>>>>>>>> `-changelog`.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Thus, from my point of view, it would make sense to keep
> the
> >     >>>> current
> >     >>>>>>>>>>> distinction.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> -Matthias
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> I think this sounds good as well. It's worth clarifying
> >     whether
> >     >>>>>>> topics
> >     >>>>>>>>> that are named by the user but created by streams are
> >     considered
> >     >>>>>>>> "internal"
> >     >>>>>>>>> topics also.
> >     >>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> My idea was, to relax the requirement for through() that
> >     a topic
> >     >>>>>>> must
> >     >>>>>>>> be
> >     >>>>>>>>>>> created manually before startup.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Thus, if no through() call is made, a (internal) topic
> >     is created
> >     >>>>>>> the
> >     >>>>>>>>>>> same way we do it currently.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> If one uses `through(String topicName)` we keep the
> current
> >     >>>> behavior
> >     >>>>>>>> and
> >     >>>>>>>>>>> require users to create the topic manually.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> The reasoning is as follows: if a user creates a topic
> >     manually, a
> >     >>>>>>>> user
> >     >>>>>>>>>>> can just use it for repartitioning. As the topic is
> >     already there,
> >     >>>>>>>> there
> >     >>>>>>>>>>> is no need to specify any topic configs.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> We add a new `through()` overload (details TBD) that
> >     allows to
> >     >>>>>>> specify
> >     >>>>>>>>>>> topic configs and Streams create the topic with those
> >     configs.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Reasoning: user don't want to manage topic manually,
> >     thus, it's
> >     >>>>>>> still
> >     >>>>>>>> an
> >     >>>>>>>>>>> internal topic and Streams create the topic name
> >     automatically as
> >     >>>>>>> for
> >     >>>>>>>>>>> all other internal topics. However, users gets some more
> >     control
> >     >>>>>>> about
> >     >>>>>>>>>>> topic parameters like number of partitions (we should
> >     discuss what
> >     >>>>>>>> other
> >     >>>>>>>>>>> configs would be useful).
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Does this make sense?
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> -Matthias
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Hi.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like
> ATM.
> >     >>>>>>>>>>> I just would argue that repartitioning should be an own
> >     API call
> >     >>>>>>> like
> >     >>>>>>>>>>> through or something.
> >     >>>>>>>>>>> One can use through or to already to get this. I would
> >     argue one
> >     >>>>>>>> should
> >     >>>>>>>>>>> look there instead of overloads
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Best Jan
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Dear community,
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based
> >     on issue
> >     >>>>>>> [2].
> >     >>>>>>>>>>> Please feel free to comment.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> [1]
> >     >>>>>>>>>>>
> >     >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >     >>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
> >     >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Cheers,
> >     >>>>>>>>>>> Jeyhun
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> ________________________________
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> This email and any attachments may contain confidential
> and
> >     >>>>>>> privileged
> >     >>>>>>>>> material for the sole use of the intended recipient. Any
> >     review,
> >     >>>>>>> copying,
> >     >>>>>>>>> or distribution of this email (or any attachments) by
> >     others is
> >     >>>>>>>> prohibited.
> >     >>>>>>>>> If you are not the intended recipient, please contact the
> >     sender
> >     >>>>>>>>> immediately and permanently delete this email and any
> >     attachments. No
> >     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
> >     any binding
> >     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
> >     agreements with
> >     >>>>>>>>> TiVo
> >     >>>>>>>>> Inc. may only be made by a signed written agreement.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> ________________________________
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> This email and any attachments may contain confidential
> and
> >     >>>>>>> privileged
> >     >>>>>>>>> material for the sole use of the intended recipient. Any
> >     review,
> >     >>>>>>> copying,
> >     >>>>>>>>> or distribution of this email (or any attachments) by
> >     others is
> >     >>>>>>>> prohibited.
> >     >>>>>>>>> If you are not the intended recipient, please contact the
> >     sender
> >     >>>>>>>>> immediately and permanently delete this email and any
> >     attachments. No
> >     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
> >     any binding
> >     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
> >     agreements with
> >     >>>>>>>>> TiVo
> >     >>>>>>>>> Inc. may only be made by a signed written agreement.
> >     >>>>>>>>>
> >     >>>>>>
> >     >>>>>>
> >     >>>>>
> >     >>>>
> >     >>>>
> >     >>>
> >     >>
> >     >
> >
>
>

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
No worries. It's just good to know. It seems that some other people are
interested to drive this further. So we will just "reassign" it to them.

Thanks for letting us know.


-Matthias

On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
> Hi Matthias, all,
> 
> Currently, I am not able to complete this KIP. Please accept my
> apologies for that. 
> 
> 
> Cheers,
> Jeyhun
> 
> On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax <matthias@confluent.io
> <ma...@confluent.io>> wrote:
> 
>     What is the status of this KIP?
> 
>     -Matthias
> 
> 
>     On 2/13/18 1:43 PM, Matthias J. Sax wrote:
>     > Is there any update for this KIP?
>     >
>     >
>     > -Matthias
>     >
>     > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
>     >> Jeyhun,
>     >>
>     >> thanks for updating the KIP.
>     >>
>     >> I am wondering if you intend to add a new class `Produced`? There is
>     >> already `org.apache.kafka.streams.kstream.Produced`. So if we want to
>     >> add a new class, it must have a different name -- or we might be
>     able to
>     >> merge both into one?
>     >>
>     >> Also, for the KStream overlaods of `through()` and `to()`, can
>     you add
>     >> the different behavior using different overloads? It's not clear from
>     >> the KIP what the semantics are.
>     >>
>     >>
>     >> -Matthias
>     >>
>     >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
>     >>> Hi,
>     >>>
>     >>> Thanks for your comments. I agree with Matthias partially.
>     >>> I think we should relax some requirements related with to() and
>     through()
>     >>> methods.
>     >>> IMHO, Produced class can cover (existing/to be created) topic
>     information,
>     >>> and which will ease our effort:
>     >>>
>     >>> KStream.to(Produced topicInfo)
>     >>> KStream.through(Produced topicInfo)
>     >>>
>     >>> This will decrease the number of overloads but we will need to
>     deprecate
>     >>> the existing to() and through() methods, perhaps.
>     >>> I updated the KIP accordingly.
>     >>>
>     >>>
>     >>> Cheers,
>     >>> Jeyhun
>     >>>
>     >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
>     <matthias@confluent.io <ma...@confluent.io>>
>     >>> wrote:
>     >>>
>     >>>> @Jan:
>     >>>>
>     >>>> The `Produced` class was introduced in 1.0 to specify key and valud
>     >>>> Serdes (and partitioner) if data is written into a topic.
>     >>>>
>     >>>> Old API:
>     >>>>
>     >>>> KStream#to("topic", keySerde, valueSerde);
>     >>>>
>     >>>> New API:
>     >>>>
>     >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
>     >>>>
>     >>>>
>     >>>> This allows to reduce the number of overloads for `to()` (and
>     >>>> `through()` that follows the same pattern) -- the second
>     parameter is
>     >>>> used to cover all different variations of option parameters
>     users can
>     >>>> specify, while we only have 2 overload for `to()` itself.
>     >>>>
>     >>>> What is still unclear to me it, what you mean by this topic prefix
>     >>>> thing? Either a user cares about the topic name and thus, must
>     create
>     >>>> and manage it manually. Or the user does not care, and Streams
>     create
>     >>>> it. How would this prefix idea fit in here?
>     >>>>
>     >>>>
>     >>>>
>     >>>> @Guozhang:
>     >>>>
>     >>>> My idea was to extend `Produced` with the hint we want to give for
>     >>>> creating internal topic and pass a optional `Produced`
>     parameter. There
>     >>>> are multiple things we can do here:
>     >>>>
>     >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
>     >>>> -> just allow for `null` topic name indicating that Streams should
>     >>>> create an internal topic
>     >>>>
>     >>>> 2) stream.through(Produced...).groupBy().aggregate()
>     >>>> -> add one overload taking an mandatory `Produced`
>     >>>>
>     >>>> We use `Serialized` to picky back the information
>     >>>>
>     >>>> 3) stream.groupBy(Serialized...).aggregate()
>     >>>> and stream.groupByKey(Serialized...).aggregate()
>     >>>> -> we don't need new top level overloads
>     >>>>
>     >>>>
>     >>>> There are different trade-offs for those alternatives and maybe
>     there
>     >>>> are other ways to change the API. It's just to push the
>     discussion further.
>     >>>>
>     >>>>
>     >>>> -Matthias
>     >>>>
>     >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
>     >>>>> Hi Gouzhang,
>     >>>>>
>     >>>>> this felt like these questions are supposed to be answered by me.
>     >>>>> I do not understand the first one. I don't understand why the user
>     >>>>> shouldn't be able to specify a suffix for the topic name.
>     >>>>>
>     >>>>>  For the third question I am not 100% familiar if the Produced
>     class
>     >>>>> came to existence
>     >>>>> at all. I remember proposing it somewhere in our redo DSL
>     discussion that
>     >>>>> I dropped out of later. Finally any call that does:
>     >>>>>
>     >>>>> 1. create the internal topic
>     >>>>> 2. register sink
>     >>>>> 3. register source
>     >>>>>
>     >>>>> will always get the work done. If we have a Produced like
>     class. putting
>     >>>>> all the parameters
>     >>>>> in there make sense. (Partitioner, serde, PartitionHint,
>     internal, name
>     >>>>> ... )
>     >>>>>
>     >>>>> Hope this helps?
>     >>>>>
>     >>>>>
>     >>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
>     >>>>>> A few clarification questions on the proposal details.
>     >>>>>>
>     >>>>>> 1. API: although the repartition only happens at the final
>     stateful
>     >>>>>> operations like agg / join, the repartition flag info was
>     actually
>     >>>> passed
>     >>>>>> from an earlier operator like map / groupBy. So what should
>     be the new
>     >>>>>> API
>     >>>>>> look like? For example, if we do
>     >>>>>>
>     >>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
>     >>>>>>
>     >>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
>     >>>>>>
>     >>>>>> 2. Semantics: as Matthias mentioned, today any topics defined in
>     >>>>>> "through()" call is considered a user topic, and hence users are
>     >>>>>> responsible for managing them, including the topic name. For
>     this KIP's
>     >>>>>> purpose, though, users would not care about the topic name.
>     I.e. as a
>     >>>>>> user
>     >>>>>> I still want to make it be an internal topic so that I do not
>     need to
>     >>>>>> worry
>     >>>>>> about it at all, but only specify num.partitions.
>     >>>>>>
>     >>>>>> 3. Details: in Produced we do not have specs for specifying the
>     >>>>>> num.partitions or should we repartition or not. So it is
>     still not
>     >>>>>> clear to
>     >>>>>> me how we would make use of that to achieve what's in the old
>     >>>>>> proposal's RepartitionHint class.
>     >>>>>>
>     >>>>>>
>     >>>>>>
>     >>>>>> Guozhang
>     >>>>>>
>     >>>>>>
>     >>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yuzhihong@gmail.com
>     <ma...@gmail.com>> wrote:
>     >>>>>>
>     >>>>>>> bq. enlarge the score of through()
>     >>>>>>>
>     >>>>>>> I guess you meant scope.
>     >>>>>>>
>     >>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov
>     <je.karimov@gmail.com <ma...@gmail.com>>
>     >>>>>>> wrote:
>     >>>>>>>
>     >>>>>>>> Hi,
>     >>>>>>>>
>     >>>>>>>> Sorry for the late reply. I am convinced that we should
>     enlarge the
>     >>>>>>>> score
>     >>>>>>>> of through() (add more overloads) instead of introducing a
>     separate
>     >>>> set
>     >>>>>>> of
>     >>>>>>>> overloads to other methods.
>     >>>>>>>> I will update the KIP soon based on the discussion and inform.
>     >>>>>>>>
>     >>>>>>>>
>     >>>>>>>> Cheers,
>     >>>>>>>> Jeyhun
>     >>>>>>>>
>     >>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak
>     <Jan.Filipiak@trivago.com <ma...@trivago.com>
>     >>>>>
>     >>>>>>>> wrote:
>     >>>>>>>>
>     >>>>>>>>> Sorry for not beeing 100% up to date.
>     >>>>>>>>> Back then we had the discussion that when an operation
>     puts a >Sink<
>     >>>>>>>>> into the topology, a >Produced<
>     >>>>>>>>> parameter is added. This produced parameter could have
>     internal or
>     >>>>>>>>> external. If internal I think the name would still make
>     >>>>>>>>> a great suffix for the topic name
>     >>>>>>>>>
>     >>>>>>>>> Is this plan still around? Otherwise having the name as
>     suffix is
>     >>>>>>>>> probably always good it can help the user quicker to
>     identify hot
>     >>>>>>> topics
>     >>>>>>>>> that need more
>     >>>>>>>>> partitions if he has many of these internal repartitions
>     >>>>>>>>>
>     >>>>>>>>> Best Jan
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
>     >>>>>>>>>> I absolute agree with what you say. It's not a requirement to
>     >>>>>>> specify a
>     >>>>>>>>>> topic name -- and this was the idea -- if user does
>     specify a name,
>     >>>>>>> we
>     >>>>>>>>>> treat as is -- if users does not specify a name, Streams
>     create an
>     >>>>>>>>>> internal topic.
>     >>>>>>>>>>
>     >>>>>>>>>> The goal of the Jira is to allow a simplified way to control
>     >>>>>>>>>> repartitioning (atm, user needs to manually create a
>     topic and use
>     >>>>>>> via
>     >>>>>>>>>> through()).
>     >>>>>>>>>>
>     >>>>>>>>>> Thus, the idea is to make the topic name parameter of through
>     >>>>>>> optional.
>     >>>>>>>>>> It's of course just an idea. Happy do have a other API
>     design. The
>     >>>>>>> goal
>     >>>>>>>>>> was, to avoid to many new overloads.
>     >>>>>>>>>>
>     >>>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>     current
>     >>>>>>>>> distinction?
>     >>>>>>>>>> Current distinction is: user topics are created manually
>     and user
>     >>>>>>>>>> specifies the name -- internal topics are created by
>     Kafka Streams
>     >>>>>>> and
>     >>>>>>>>>> an name is generated automatically.
>     >>>>>>>>>>
>     >>>>>>>>>> -> through("user-topic")
>     >>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) //
>     Streams creates
>     >>>>>>>> an
>     >>>>>>>>>> internal topic
>     >>>>>>>>>>
>     >>>>>>>>>>
>     >>>>>>>>>> -Matthias
>     >>>>>>>>>>
>     >>>>>>>>>>
>     >>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
>     >>>>>>>>>>> Could you clarify exactly what you mean by keeping the
>     current
>     >>>>>>>>> distinction?
>     >>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear
>     that being
>     >>>>>>> able
>     >>>>>>>>> to specify a custom name is actually a requirement. If the
>     goal is to
>     >>>>>>>>> control repartitioning and tune parallelism, maybe we can just
>     >>>>>>>>> sidestep
>     >>>>>>>>> this issue altogether by removing the ability to set a
>     different
>     >>>> name.
>     >>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>> That's a good point. In current design, we strictly
>     distinguish
>     >>>>>>> both.
>     >>>>>>>>>>> For example, the reset tools deletes internal topics
>     (starting with
>     >>>>>>>>>>> prefix `<application.id <http://application.id>>-` and
>     ending with either `-repartition`
>     >>>> or
>     >>>>>>>>>>> `-changelog`.
>     >>>>>>>>>>>
>     >>>>>>>>>>> Thus, from my point of view, it would make sense to keep the
>     >>>> current
>     >>>>>>>>>>> distinction.
>     >>>>>>>>>>>
>     >>>>>>>>>>> -Matthias
>     >>>>>>>>>>>
>     >>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> I think this sounds good as well. It's worth clarifying
>     whether
>     >>>>>>> topics
>     >>>>>>>>> that are named by the user but created by streams are
>     considered
>     >>>>>>>> "internal"
>     >>>>>>>>> topics also.
>     >>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>> My idea was, to relax the requirement for through() that
>     a topic
>     >>>>>>> must
>     >>>>>>>> be
>     >>>>>>>>>>> created manually before startup.
>     >>>>>>>>>>>
>     >>>>>>>>>>> Thus, if no through() call is made, a (internal) topic
>     is created
>     >>>>>>> the
>     >>>>>>>>>>> same way we do it currently.
>     >>>>>>>>>>>
>     >>>>>>>>>>> If one uses `through(String topicName)` we keep the current
>     >>>> behavior
>     >>>>>>>> and
>     >>>>>>>>>>> require users to create the topic manually.
>     >>>>>>>>>>>
>     >>>>>>>>>>> The reasoning is as follows: if a user creates a topic
>     manually, a
>     >>>>>>>> user
>     >>>>>>>>>>> can just use it for repartitioning. As the topic is
>     already there,
>     >>>>>>>> there
>     >>>>>>>>>>> is no need to specify any topic configs.
>     >>>>>>>>>>>
>     >>>>>>>>>>> We add a new `through()` overload (details TBD) that
>     allows to
>     >>>>>>> specify
>     >>>>>>>>>>> topic configs and Streams create the topic with those
>     configs.
>     >>>>>>>>>>>
>     >>>>>>>>>>> Reasoning: user don't want to manage topic manually,
>     thus, it's
>     >>>>>>> still
>     >>>>>>>> an
>     >>>>>>>>>>> internal topic and Streams create the topic name
>     automatically as
>     >>>>>>> for
>     >>>>>>>>>>> all other internal topics. However, users gets some more
>     control
>     >>>>>>> about
>     >>>>>>>>>>> topic parameters like number of partitions (we should
>     discuss what
>     >>>>>>>> other
>     >>>>>>>>>>> configs would be useful).
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> Does this make sense?
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> -Matthias
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> Hi.
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like ATM.
>     >>>>>>>>>>> I just would argue that repartitioning should be an own
>     API call
>     >>>>>>> like
>     >>>>>>>>>>> through or something.
>     >>>>>>>>>>> One can use through or to already to get this. I would
>     argue one
>     >>>>>>>> should
>     >>>>>>>>>>> look there instead of overloads
>     >>>>>>>>>>>
>     >>>>>>>>>>> Best Jan
>     >>>>>>>>>>>
>     >>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> Dear community,
>     >>>>>>>>>>>
>     >>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based
>     on issue
>     >>>>>>> [2].
>     >>>>>>>>>>> Please feel free to comment.
>     >>>>>>>>>>>
>     >>>>>>>>>>> [1]
>     >>>>>>>>>>>
>     >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>     >>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
>     >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> Cheers,
>     >>>>>>>>>>> Jeyhun
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> ________________________________
>     >>>>>>>>>>>
>     >>>>>>>>>>> This email and any attachments may contain confidential and
>     >>>>>>> privileged
>     >>>>>>>>> material for the sole use of the intended recipient. Any
>     review,
>     >>>>>>> copying,
>     >>>>>>>>> or distribution of this email (or any attachments) by
>     others is
>     >>>>>>>> prohibited.
>     >>>>>>>>> If you are not the intended recipient, please contact the
>     sender
>     >>>>>>>>> immediately and permanently delete this email and any
>     attachments. No
>     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>     any binding
>     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>     agreements with
>     >>>>>>>>> TiVo
>     >>>>>>>>> Inc. may only be made by a signed written agreement.
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>>
>     >>>>>>>>>>> ________________________________
>     >>>>>>>>>>>
>     >>>>>>>>>>> This email and any attachments may contain confidential and
>     >>>>>>> privileged
>     >>>>>>>>> material for the sole use of the intended recipient. Any
>     review,
>     >>>>>>> copying,
>     >>>>>>>>> or distribution of this email (or any attachments) by
>     others is
>     >>>>>>>> prohibited.
>     >>>>>>>>> If you are not the intended recipient, please contact the
>     sender
>     >>>>>>>>> immediately and permanently delete this email and any
>     attachments. No
>     >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude
>     any binding
>     >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding
>     agreements with
>     >>>>>>>>> TiVo
>     >>>>>>>>> Inc. may only be made by a signed written agreement.
>     >>>>>>>>>
>     >>>>>>
>     >>>>>>
>     >>>>>
>     >>>>
>     >>>>
>     >>>
>     >>
>     >
> 


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

Posted by Jeyhun Karimov <je...@gmail.com>.
Hi Matthias, all,

Currently, I am not able to complete this KIP. Please accept my apologies
for that.


Cheers,
Jeyhun

On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> What is the status of this KIP?
>
> -Matthias
>
>
> On 2/13/18 1:43 PM, Matthias J. Sax wrote:
> > Is there any update for this KIP?
> >
> >
> > -Matthias
> >
> > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
> >> Jeyhun,
> >>
> >> thanks for updating the KIP.
> >>
> >> I am wondering if you intend to add a new class `Produced`? There is
> >> already `org.apache.kafka.streams.kstream.Produced`. So if we want to
> >> add a new class, it must have a different name -- or we might be able to
> >> merge both into one?
> >>
> >> Also, for the KStream overlaods of `through()` and `to()`, can you add
> >> the different behavior using different overloads? It's not clear from
> >> the KIP what the semantics are.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
> >>> Hi,
> >>>
> >>> Thanks for your comments. I agree with Matthias partially.
> >>> I think we should relax some requirements related with to() and
> through()
> >>> methods.
> >>> IMHO, Produced class can cover (existing/to be created) topic
> information,
> >>> and which will ease our effort:
> >>>
> >>> KStream.to(Produced topicInfo)
> >>> KStream.through(Produced topicInfo)
> >>>
> >>> This will decrease the number of overloads but we will need to
> deprecate
> >>> the existing to() and through() methods, perhaps.
> >>> I updated the KIP accordingly.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> @Jan:
> >>>>
> >>>> The `Produced` class was introduced in 1.0 to specify key and valud
> >>>> Serdes (and partitioner) if data is written into a topic.
> >>>>
> >>>> Old API:
> >>>>
> >>>> KStream#to("topic", keySerde, valueSerde);
> >>>>
> >>>> New API:
> >>>>
> >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
> >>>>
> >>>>
> >>>> This allows to reduce the number of overloads for `to()` (and
> >>>> `through()` that follows the same pattern) -- the second parameter is
> >>>> used to cover all different variations of option parameters users can
> >>>> specify, while we only have 2 overload for `to()` itself.
> >>>>
> >>>> What is still unclear to me it, what you mean by this topic prefix
> >>>> thing? Either a user cares about the topic name and thus, must create
> >>>> and manage it manually. Or the user does not care, and Streams create
> >>>> it. How would this prefix idea fit in here?
> >>>>
> >>>>
> >>>>
> >>>> @Guozhang:
> >>>>
> >>>> My idea was to extend `Produced` with the hint we want to give for
> >>>> creating internal topic and pass a optional `Produced` parameter.
> There
> >>>> are multiple things we can do here:
> >>>>
> >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
> >>>> -> just allow for `null` topic name indicating that Streams should
> >>>> create an internal topic
> >>>>
> >>>> 2) stream.through(Produced...).groupBy().aggregate()
> >>>> -> add one overload taking an mandatory `Produced`
> >>>>
> >>>> We use `Serialized` to picky back the information
> >>>>
> >>>> 3) stream.groupBy(Serialized...).aggregate()
> >>>> and stream.groupByKey(Serialized...).aggregate()
> >>>> -> we don't need new top level overloads
> >>>>
> >>>>
> >>>> There are different trade-offs for those alternatives and maybe there
> >>>> are other ways to change the API. It's just to push the discussion
> further.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
> >>>>> Hi Gouzhang,
> >>>>>
> >>>>> this felt like these questions are supposed to be answered by me.
> >>>>> I do not understand the first one. I don't understand why the user
> >>>>> shouldn't be able to specify a suffix for the topic name.
> >>>>>
> >>>>>  For the third question I am not 100% familiar if the Produced class
> >>>>> came to existence
> >>>>> at all. I remember proposing it somewhere in our redo DSL discussion
> that
> >>>>> I dropped out of later. Finally any call that does:
> >>>>>
> >>>>> 1. create the internal topic
> >>>>> 2. register sink
> >>>>> 3. register source
> >>>>>
> >>>>> will always get the work done. If we have a Produced like class.
> putting
> >>>>> all the parameters
> >>>>> in there make sense. (Partitioner, serde, PartitionHint, internal,
> name
> >>>>> ... )
> >>>>>
> >>>>> Hope this helps?
> >>>>>
> >>>>>
> >>>>> On 10.11.2017 07:54, Guozhang Wang wrote:
> >>>>>> A few clarification questions on the proposal details.
> >>>>>>
> >>>>>> 1. API: although the repartition only happens at the final stateful
> >>>>>> operations like agg / join, the repartition flag info was actually
> >>>> passed
> >>>>>> from an earlier operator like map / groupBy. So what should be the
> new
> >>>>>> API
> >>>>>> look like? For example, if we do
> >>>>>>
> >>>>>> stream.groupBy().through("topic-name", Produced..).aggregate
> >>>>>>
> >>>>>> This would be add a bunch of APIs to GroupedKStream/KTable
> >>>>>>
> >>>>>> 2. Semantics: as Matthias mentioned, today any topics defined in
> >>>>>> "through()" call is considered a user topic, and hence users are
> >>>>>> responsible for managing them, including the topic name. For this
> KIP's
> >>>>>> purpose, though, users would not care about the topic name. I.e. as
> a
> >>>>>> user
> >>>>>> I still want to make it be an internal topic so that I do not need
> to
> >>>>>> worry
> >>>>>> about it at all, but only specify num.partitions.
> >>>>>>
> >>>>>> 3. Details: in Produced we do not have specs for specifying the
> >>>>>> num.partitions or should we repartition or not. So it is still not
> >>>>>> clear to
> >>>>>> me how we would make use of that to achieve what's in the old
> >>>>>> proposal's RepartitionHint class.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Nov 6, 2017 at 1:21 PM, Ted Yu <yu...@gmail.com> wrote:
> >>>>>>
> >>>>>>> bq. enlarge the score of through()
> >>>>>>>
> >>>>>>> I guess you meant scope.
> >>>>>>>
> >>>>>>> On Mon, Nov 6, 2017 at 1:15 PM, Jeyhun Karimov <
> je.karimov@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Sorry for the late reply. I am convinced that we should enlarge
> the
> >>>>>>>> score
> >>>>>>>> of through() (add more overloads) instead of introducing a
> separate
> >>>> set
> >>>>>>> of
> >>>>>>>> overloads to other methods.
> >>>>>>>> I will update the KIP soon based on the discussion and inform.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Jeyhun
> >>>>>>>>
> >>>>>>>> On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak <
> Jan.Filipiak@trivago.com
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sorry for not beeing 100% up to date.
> >>>>>>>>> Back then we had the discussion that when an operation puts a
> >Sink<
> >>>>>>>>> into the topology, a >Produced<
> >>>>>>>>> parameter is added. This produced parameter could have internal
> or
> >>>>>>>>> external. If internal I think the name would still make
> >>>>>>>>> a great suffix for the topic name
> >>>>>>>>>
> >>>>>>>>> Is this plan still around? Otherwise having the name as suffix is
> >>>>>>>>> probably always good it can help the user quicker to identify hot
> >>>>>>> topics
> >>>>>>>>> that need more
> >>>>>>>>> partitions if he has many of these internal repartitions
> >>>>>>>>>
> >>>>>>>>> Best Jan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 06.11.2017 20:13, Matthias J. Sax wrote:
> >>>>>>>>>> I absolute agree with what you say. It's not a requirement to
> >>>>>>> specify a
> >>>>>>>>>> topic name -- and this was the idea -- if user does specify a
> name,
> >>>>>>> we
> >>>>>>>>>> treat as is -- if users does not specify a name, Streams create
> an
> >>>>>>>>>> internal topic.
> >>>>>>>>>>
> >>>>>>>>>> The goal of the Jira is to allow a simplified way to control
> >>>>>>>>>> repartitioning (atm, user needs to manually create a topic and
> use
> >>>>>>> via
> >>>>>>>>>> through()).
> >>>>>>>>>>
> >>>>>>>>>> Thus, the idea is to make the topic name parameter of through
> >>>>>>> optional.
> >>>>>>>>>> It's of course just an idea. Happy do have a other API design.
> The
> >>>>>>> goal
> >>>>>>>>>> was, to avoid to many new overloads.
> >>>>>>>>>>
> >>>>>>>>>>>> Could you clarify exactly what you mean by keeping the current
> >>>>>>>>> distinction?
> >>>>>>>>>> Current distinction is: user topics are created manually and
> user
> >>>>>>>>>> specifies the name -- internal topics are created by Kafka
> Streams
> >>>>>>> and
> >>>>>>>>>> an name is generated automatically.
> >>>>>>>>>>
> >>>>>>>>>> -> through("user-topic")
> >>>>>>>>>> -> through(TopicConfig.withNumberOfPartitions(5)) // Streams
> creates
> >>>>>>>> an
> >>>>>>>>>> internal topic
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 11/6/17 6:56 PM, Thomas Becker wrote:
> >>>>>>>>>>> Could you clarify exactly what you mean by keeping the current
> >>>>>>>>> distinction?
> >>>>>>>>>>> Actually, re-reading the KIP and JIRA, it's not clear that
> being
> >>>>>>> able
> >>>>>>>>> to specify a custom name is actually a requirement. If the goal
> is to
> >>>>>>>>> control repartitioning and tune parallelism, maybe we can just
> >>>>>>>>> sidestep
> >>>>>>>>> this issue altogether by removing the ability to set a different
> >>>> name.
> >>>>>>>>>>> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> That's a good point. In current design, we strictly distinguish
> >>>>>>> both.
> >>>>>>>>>>> For example, the reset tools deletes internal topics (starting
> with
> >>>>>>>>>>> prefix `<application.id>-` and ending with either
> `-repartition`
> >>>> or
> >>>>>>>>>>> `-changelog`.
> >>>>>>>>>>>
> >>>>>>>>>>> Thus, from my point of view, it would make sense to keep the
> >>>> current
> >>>>>>>>>>> distinction.
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 11/6/17 4:45 PM, Thomas Becker wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I think this sounds good as well. It's worth clarifying whether
> >>>>>>> topics
> >>>>>>>>> that are named by the user but created by streams are considered
> >>>>>>>> "internal"
> >>>>>>>>> topics also.
> >>>>>>>>>>> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> My idea was, to relax the requirement for through() that a
> topic
> >>>>>>> must
> >>>>>>>> be
> >>>>>>>>>>> created manually before startup.
> >>>>>>>>>>>
> >>>>>>>>>>> Thus, if no through() call is made, a (internal) topic is
> created
> >>>>>>> the
> >>>>>>>>>>> same way we do it currently.
> >>>>>>>>>>>
> >>>>>>>>>>> If one uses `through(String topicName)` we keep the current
> >>>> behavior
> >>>>>>>> and
> >>>>>>>>>>> require users to create the topic manually.
> >>>>>>>>>>>
> >>>>>>>>>>> The reasoning is as follows: if a user creates a topic
> manually, a
> >>>>>>>> user
> >>>>>>>>>>> can just use it for repartitioning. As the topic is already
> there,
> >>>>>>>> there
> >>>>>>>>>>> is no need to specify any topic configs.
> >>>>>>>>>>>
> >>>>>>>>>>> We add a new `through()` overload (details TBD) that allows to
> >>>>>>> specify
> >>>>>>>>>>> topic configs and Streams create the topic with those configs.
> >>>>>>>>>>>
> >>>>>>>>>>> Reasoning: user don't want to manage topic manually, thus, it's
> >>>>>>> still
> >>>>>>>> an
> >>>>>>>>>>> internal topic and Streams create the topic name automatically
> as
> >>>>>>> for
> >>>>>>>>>>> all other internal topics. However, users gets some more
> control
> >>>>>>> about
> >>>>>>>>>>> topic parameters like number of partitions (we should discuss
> what
> >>>>>>>> other
> >>>>>>>>>>> configs would be useful).
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 11/5/17 1:21 AM, Jan Filipiak wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Hi.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Im not 100 % up to date what version 1.0 DSL looks like ATM.
> >>>>>>>>>>> I just would argue that repartitioning should be an own API
> call
> >>>>>>> like
> >>>>>>>>>>> through or something.
> >>>>>>>>>>> One can use through or to already to get this. I would argue
> one
> >>>>>>>> should
> >>>>>>>>>>> look there instead of overloads
> >>>>>>>>>>>
> >>>>>>>>>>> Best Jan
> >>>>>>>>>>>
> >>>>>>>>>>> On 04.11.2017 16:01, Jeyhun Karimov wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Dear community,
> >>>>>>>>>>>
> >>>>>>>>>>> I would like to initiate discussion on KIP-221 [1] based on
> issue
> >>>>>>> [2].
> >>>>>>>>>>> Please feel free to comment.
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>> 221%3A+Repartition+Topic+Hints+in+Streams
> >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-6037
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Jeyhun
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> ________________________________
> >>>>>>>>>>>
> >>>>>>>>>>> This email and any attachments may contain confidential and
> >>>>>>> privileged
> >>>>>>>>> material for the sole use of the intended recipient. Any review,
> >>>>>>> copying,
> >>>>>>>>> or distribution of this email (or any attachments) by others is
> >>>>>>>> prohibited.
> >>>>>>>>> If you are not the intended recipient, please contact the sender
> >>>>>>>>> immediately and permanently delete this email and any
> attachments. No
> >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude any
> binding
> >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding agreements
> with
> >>>>>>>>> TiVo
> >>>>>>>>> Inc. may only be made by a signed written agreement.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> ________________________________
> >>>>>>>>>>>
> >>>>>>>>>>> This email and any attachments may contain confidential and
> >>>>>>> privileged
> >>>>>>>>> material for the sole use of the intended recipient. Any review,
> >>>>>>> copying,
> >>>>>>>>> or distribution of this email (or any attachments) by others is
> >>>>>>>> prohibited.
> >>>>>>>>> If you are not the intended recipient, please contact the sender
> >>>>>>>>> immediately and permanently delete this email and any
> attachments. No
> >>>>>>>>> employee or agent of TiVo Inc. is authorized to conclude any
> binding
> >>>>>>>>> agreement on behalf of TiVo Inc. by email. Binding agreements
> with
> >>>>>>>>> TiVo
> >>>>>>>>> Inc. may only be made by a signed written agreement.
> >>>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>