You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Richard Yu <yo...@gmail.com> on 2017/09/17 04:36:45 UTC

[Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Hi,
Please take a look at:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
202+Move+merge%28%29+from+StreamsBuilder+to+KStream

Thanks

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
The PR should be ready. I have removed the old merge() method for 1.0.0.


On Tue, Sep 19, 2017 at 4:22 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I'd like to make an exception for this KIP if it's PR can get in before the
> the code freeze date, as it's a low risk small KIP that is unlikely to
> introduce regression.
>
>
> Guozhang
>
> On Wed, Sep 20, 2017 at 2:01 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > @Damian, this KIP goes into 1.1 but not 1.0, so we need to go the
> > deprecation way...
> >
> > I would be happy to get it into 1.0 and avoid the deprecation. But
> > strictly speaking, the KIP vote deadline passed already... Not sure if
> > there is any exception from this.
> >
> >
> > -Matthias
> >
> > On 9/19/17 12:17 AM, Damian Guy wrote:
> > > Hi Richard,
> > >
> > > Thanks for the KIP. Looks good, just one thing: we don't need to
> > deprecate
> > > StreamBuilder#merge as it has been added during this release cycle. It
> > can
> > > just be removed.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 18 Sep 2017 at 23:22 Richard Yu <yo...@gmail.com>
> > wrote:
> > >
> > >> The discussion should not stay idle. Since this issue is so small, we
> > >> should move it into the voting phase.
> > >>
> > >> On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > >> wrote:
> > >>
> > >>> Thanks for updating the KIP.
> > >>>
> > >>> You are of course right, that we internally need access to
> > >>> InternalStreamBuilder, but that should not be too hard and
> effectively
> > >>> be an internal implementation detail.
> > >>>
> > >>>
> > >>> Two more comments:
> > >>>
> > >>> the new method should be
> > >>>
> > >>>> KStream<K,V> merge(KStream<K,V> stream);
> > >>>
> > >>> and not
> > >>>
> > >>>> <K,V> KStream<K,V> merge(KStream<K,V> streams);
> > >>>
> > >>> as in the KIP? The prefix `<K,V>` is not required for non-static
> > methods
> > >>> and it should be singular (not plural) as parameter name?
> > >>>
> > >>> Can you also add an explicit sentence, that the new method does not
> use
> > >>> varargs anymore but a single KStream parameter (in contrast to the
> old
> > >>> method). And mention that this is no limitation as calls to new
> merge()
> > >>> can be chained.
> > >>>
> > >>>
> > >>>
> > >>> Thanks a lot!
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>> On 9/17/17 10:32 AM, Richard Yu wrote:
> > >>>> Correction: When the current merge() method is called with multiple
> > >>>> streams, a warning will be printed (or logged), but this should not
> > >>> hinder
> > >>>> ability to read the log.
> > >>>> There is a missing unchecked warning suppression for the old method.
> > >>>> However, it is not high priority due to deprecation of the old
> merge()
> > >>>> method.
> > >>>>
> > >>>>
> > >>>> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <
> > >> yohan.richard.yu@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> With regards to Xavier's comment, this practice I do no think
> applies
> > >> to
> > >>>>> this PR. There is not much potential here for warnings to be
> thrown.
> > >>> Note
> > >>>>> that in StreamsBuilder's merge, their is no
> > >>> @SuppressWarnings("unchecked")--indicating
> > >>>>> that warnings is sparse, if not nonexistent.
> > >>>>>
> > >>>>>
> > >>>>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <
> > >> yohan.richard.yu@gmail.com
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> KIP-202 has been changed according to the conditions of your
> > >>> suggestion.
> > >>>>>>
> > >>>>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <
> > >>> yohan.richard.yu@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> I added StreamsBuilder under the assumption that
> > >> InternalStreamBuilder
> > >>>>>>> would be required to merge
> > >>>>>>> two streams. However, if that is not the case, then I would still
> > >>> need a
> > >>>>>>> couple of things:
> > >>>>>>>
> > >>>>>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
> > >>>>>>>
> > >>>>>>> 2) The merge_name that the merged streams will be given
> > >>>>>>>
> > >>>>>>> 3) Need access to the corresponding InternalStreamBuilder's
> > >>>>>>> InternalTopologyBuilder to add a processor (for the new KStreams)
> > >>>>>>>
> > >>>>>>> All these parameters are associated with InternalStreamsBuilder,
> > >> thus
> > >>> it
> > >>>>>>> is essential towards merging the streams.
> > >>>>>>> We are left with three options (taking into account the
> restriction
> > >>> that
> > >>>>>>> InternalStreamsBuilder's reference scope is mostly limited to
> > within
> > >>> the
> > >>>>>>> org.apache.kafka.streams.kstream.internals package):
> > >>>>>>>
> > >>>>>>> a) Find a way to pass InternalStreamsBuilder indirectly into the
> > >>> class.
> > >>>>>>> (using StreamsBuilder)
> > >>>>>>>
> > >>>>>>> b) Find the matching InternalStreamBuilder within the method that
> > >>>>>>> corresponds to the streams about to be merged.
> > >>>>>>>
> > >>>>>>> or c) Use the local InternalStreamsBuilder inherited from
> > >>>>>>> AbstractStream, assuming that it is the correct builder
> > >>>>>>>
> > >>>>>>> From your suggestion, that would mean using the c option I
> > mentioned
> > >>>>>>> earlier. This choice of implementation works, but it could also
> > >>> include the
> > >>>>>>> risk that the local InternalStreamsBuilder might not be the
> correct
> > >>> one
> > >>>>>>> (just something one might want to keep in mind, since I will
> change
> > >>> it)
> > >>>>>>>
> > >>>>>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <
> > >>> matthias@confluent.io
> > >>>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Richard,
> > >>>>>>>>
> > >>>>>>>> Thanks a lot for the KIP!
> > >>>>>>>>
> > >>>>>>>> I have three question:
> > >>>>>>>>  - why is the new merge() method static?
> > >>>>>>>>  - why does the new merge() method take StreamsBuilder as a
> > >>> parameter?
> > >>>>>>>>  - did you think about Xavier's comment (see the JIRA in case
> you
> > >> did
> > >>>>>>>> not notice it yet) about varargs vs adding some overloads to
> merge
> > >>>>>>>> stream?
> > >>>>>>>>
> > >>>>>>>> My personal take is that merge() should not be static and not
> take
> > >>>>>>>> StreamsBuilder. The idea of the JIRA was to get a more natural
> > API:
> > >>>>>>>>
> > >>>>>>>> // old
> > >>>>>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
> > >>>>>>>> // new
> > >>>>>>>> KStream merge = stream1.merge(stream2);
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Having pointed out the second pattern, it should actually be
> fine
> > >> to
> > >>> get
> > >>>>>>>> rid of varargs in merger() at all, as users could chain multiple
> > >>> calls
> > >>>>>>>> to merge() after each other:
> > >>>>>>>>
> > >>>>>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
> > >>>>>>>>> Hi,
> > >>>>>>>>> Please take a look at:
> > >>>>>>>>>
> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Guozhang Wang <wa...@gmail.com>.
I'd like to make an exception for this KIP if it's PR can get in before the
the code freeze date, as it's a low risk small KIP that is unlikely to
introduce regression.


Guozhang

On Wed, Sep 20, 2017 at 2:01 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> @Damian, this KIP goes into 1.1 but not 1.0, so we need to go the
> deprecation way...
>
> I would be happy to get it into 1.0 and avoid the deprecation. But
> strictly speaking, the KIP vote deadline passed already... Not sure if
> there is any exception from this.
>
>
> -Matthias
>
> On 9/19/17 12:17 AM, Damian Guy wrote:
> > Hi Richard,
> >
> > Thanks for the KIP. Looks good, just one thing: we don't need to
> deprecate
> > StreamBuilder#merge as it has been added during this release cycle. It
> can
> > just be removed.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 18 Sep 2017 at 23:22 Richard Yu <yo...@gmail.com>
> wrote:
> >
> >> The discussion should not stay idle. Since this issue is so small, we
> >> should move it into the voting phase.
> >>
> >> On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax <matthias@confluent.io
> >
> >> wrote:
> >>
> >>> Thanks for updating the KIP.
> >>>
> >>> You are of course right, that we internally need access to
> >>> InternalStreamBuilder, but that should not be too hard and effectively
> >>> be an internal implementation detail.
> >>>
> >>>
> >>> Two more comments:
> >>>
> >>> the new method should be
> >>>
> >>>> KStream<K,V> merge(KStream<K,V> stream);
> >>>
> >>> and not
> >>>
> >>>> <K,V> KStream<K,V> merge(KStream<K,V> streams);
> >>>
> >>> as in the KIP? The prefix `<K,V>` is not required for non-static
> methods
> >>> and it should be singular (not plural) as parameter name?
> >>>
> >>> Can you also add an explicit sentence, that the new method does not use
> >>> varargs anymore but a single KStream parameter (in contrast to the old
> >>> method). And mention that this is no limitation as calls to new merge()
> >>> can be chained.
> >>>
> >>>
> >>>
> >>> Thanks a lot!
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 9/17/17 10:32 AM, Richard Yu wrote:
> >>>> Correction: When the current merge() method is called with multiple
> >>>> streams, a warning will be printed (or logged), but this should not
> >>> hinder
> >>>> ability to read the log.
> >>>> There is a missing unchecked warning suppression for the old method.
> >>>> However, it is not high priority due to deprecation of the old merge()
> >>>> method.
> >>>>
> >>>>
> >>>> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <
> >> yohan.richard.yu@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> With regards to Xavier's comment, this practice I do no think applies
> >> to
> >>>>> this PR. There is not much potential here for warnings to be thrown.
> >>> Note
> >>>>> that in StreamsBuilder's merge, their is no
> >>> @SuppressWarnings("unchecked")--indicating
> >>>>> that warnings is sparse, if not nonexistent.
> >>>>>
> >>>>>
> >>>>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <
> >> yohan.richard.yu@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> KIP-202 has been changed according to the conditions of your
> >>> suggestion.
> >>>>>>
> >>>>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <
> >>> yohan.richard.yu@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> I added StreamsBuilder under the assumption that
> >> InternalStreamBuilder
> >>>>>>> would be required to merge
> >>>>>>> two streams. However, if that is not the case, then I would still
> >>> need a
> >>>>>>> couple of things:
> >>>>>>>
> >>>>>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
> >>>>>>>
> >>>>>>> 2) The merge_name that the merged streams will be given
> >>>>>>>
> >>>>>>> 3) Need access to the corresponding InternalStreamBuilder's
> >>>>>>> InternalTopologyBuilder to add a processor (for the new KStreams)
> >>>>>>>
> >>>>>>> All these parameters are associated with InternalStreamsBuilder,
> >> thus
> >>> it
> >>>>>>> is essential towards merging the streams.
> >>>>>>> We are left with three options (taking into account the restriction
> >>> that
> >>>>>>> InternalStreamsBuilder's reference scope is mostly limited to
> within
> >>> the
> >>>>>>> org.apache.kafka.streams.kstream.internals package):
> >>>>>>>
> >>>>>>> a) Find a way to pass InternalStreamsBuilder indirectly into the
> >>> class.
> >>>>>>> (using StreamsBuilder)
> >>>>>>>
> >>>>>>> b) Find the matching InternalStreamBuilder within the method that
> >>>>>>> corresponds to the streams about to be merged.
> >>>>>>>
> >>>>>>> or c) Use the local InternalStreamsBuilder inherited from
> >>>>>>> AbstractStream, assuming that it is the correct builder
> >>>>>>>
> >>>>>>> From your suggestion, that would mean using the c option I
> mentioned
> >>>>>>> earlier. This choice of implementation works, but it could also
> >>> include the
> >>>>>>> risk that the local InternalStreamsBuilder might not be the correct
> >>> one
> >>>>>>> (just something one might want to keep in mind, since I will change
> >>> it)
> >>>>>>>
> >>>>>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <
> >>> matthias@confluent.io
> >>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Richard,
> >>>>>>>>
> >>>>>>>> Thanks a lot for the KIP!
> >>>>>>>>
> >>>>>>>> I have three question:
> >>>>>>>>  - why is the new merge() method static?
> >>>>>>>>  - why does the new merge() method take StreamsBuilder as a
> >>> parameter?
> >>>>>>>>  - did you think about Xavier's comment (see the JIRA in case you
> >> did
> >>>>>>>> not notice it yet) about varargs vs adding some overloads to merge
> >>>>>>>> stream?
> >>>>>>>>
> >>>>>>>> My personal take is that merge() should not be static and not take
> >>>>>>>> StreamsBuilder. The idea of the JIRA was to get a more natural
> API:
> >>>>>>>>
> >>>>>>>> // old
> >>>>>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
> >>>>>>>> // new
> >>>>>>>> KStream merge = stream1.merge(stream2);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Having pointed out the second pattern, it should actually be fine
> >> to
> >>> get
> >>>>>>>> rid of varargs in merger() at all, as users could chain multiple
> >>> calls
> >>>>>>>> to merge() after each other:
> >>>>>>>>
> >>>>>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
> >>>>>>>>> Hi,
> >>>>>>>>> Please take a look at:
> >>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
@Damian, this KIP goes into 1.1 but not 1.0, so we need to go the
deprecation way...

I would be happy to get it into 1.0 and avoid the deprecation. But
strictly speaking, the KIP vote deadline passed already... Not sure if
there is any exception from this.


-Matthias

On 9/19/17 12:17 AM, Damian Guy wrote:
> Hi Richard,
> 
> Thanks for the KIP. Looks good, just one thing: we don't need to deprecate
> StreamBuilder#merge as it has been added during this release cycle. It can
> just be removed.
> 
> Thanks,
> Damian
> 
> On Mon, 18 Sep 2017 at 23:22 Richard Yu <yo...@gmail.com> wrote:
> 
>> The discussion should not stay idle. Since this issue is so small, we
>> should move it into the voting phase.
>>
>> On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Thanks for updating the KIP.
>>>
>>> You are of course right, that we internally need access to
>>> InternalStreamBuilder, but that should not be too hard and effectively
>>> be an internal implementation detail.
>>>
>>>
>>> Two more comments:
>>>
>>> the new method should be
>>>
>>>> KStream<K,V> merge(KStream<K,V> stream);
>>>
>>> and not
>>>
>>>> <K,V> KStream<K,V> merge(KStream<K,V> streams);
>>>
>>> as in the KIP? The prefix `<K,V>` is not required for non-static methods
>>> and it should be singular (not plural) as parameter name?
>>>
>>> Can you also add an explicit sentence, that the new method does not use
>>> varargs anymore but a single KStream parameter (in contrast to the old
>>> method). And mention that this is no limitation as calls to new merge()
>>> can be chained.
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 9/17/17 10:32 AM, Richard Yu wrote:
>>>> Correction: When the current merge() method is called with multiple
>>>> streams, a warning will be printed (or logged), but this should not
>>> hinder
>>>> ability to read the log.
>>>> There is a missing unchecked warning suppression for the old method.
>>>> However, it is not high priority due to deprecation of the old merge()
>>>> method.
>>>>
>>>>
>>>> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <
>> yohan.richard.yu@gmail.com>
>>>> wrote:
>>>>
>>>>> With regards to Xavier's comment, this practice I do no think applies
>> to
>>>>> this PR. There is not much potential here for warnings to be thrown.
>>> Note
>>>>> that in StreamsBuilder's merge, their is no
>>> @SuppressWarnings("unchecked")--indicating
>>>>> that warnings is sparse, if not nonexistent.
>>>>>
>>>>>
>>>>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <
>> yohan.richard.yu@gmail.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> KIP-202 has been changed according to the conditions of your
>>> suggestion.
>>>>>>
>>>>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <
>>> yohan.richard.yu@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I added StreamsBuilder under the assumption that
>> InternalStreamBuilder
>>>>>>> would be required to merge
>>>>>>> two streams. However, if that is not the case, then I would still
>>> need a
>>>>>>> couple of things:
>>>>>>>
>>>>>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
>>>>>>>
>>>>>>> 2) The merge_name that the merged streams will be given
>>>>>>>
>>>>>>> 3) Need access to the corresponding InternalStreamBuilder's
>>>>>>> InternalTopologyBuilder to add a processor (for the new KStreams)
>>>>>>>
>>>>>>> All these parameters are associated with InternalStreamsBuilder,
>> thus
>>> it
>>>>>>> is essential towards merging the streams.
>>>>>>> We are left with three options (taking into account the restriction
>>> that
>>>>>>> InternalStreamsBuilder's reference scope is mostly limited to within
>>> the
>>>>>>> org.apache.kafka.streams.kstream.internals package):
>>>>>>>
>>>>>>> a) Find a way to pass InternalStreamsBuilder indirectly into the
>>> class.
>>>>>>> (using StreamsBuilder)
>>>>>>>
>>>>>>> b) Find the matching InternalStreamBuilder within the method that
>>>>>>> corresponds to the streams about to be merged.
>>>>>>>
>>>>>>> or c) Use the local InternalStreamsBuilder inherited from
>>>>>>> AbstractStream, assuming that it is the correct builder
>>>>>>>
>>>>>>> From your suggestion, that would mean using the c option I mentioned
>>>>>>> earlier. This choice of implementation works, but it could also
>>> include the
>>>>>>> risk that the local InternalStreamsBuilder might not be the correct
>>> one
>>>>>>> (just something one might want to keep in mind, since I will change
>>> it)
>>>>>>>
>>>>>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <
>>> matthias@confluent.io
>>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Richard,
>>>>>>>>
>>>>>>>> Thanks a lot for the KIP!
>>>>>>>>
>>>>>>>> I have three question:
>>>>>>>>  - why is the new merge() method static?
>>>>>>>>  - why does the new merge() method take StreamsBuilder as a
>>> parameter?
>>>>>>>>  - did you think about Xavier's comment (see the JIRA in case you
>> did
>>>>>>>> not notice it yet) about varargs vs adding some overloads to merge
>>>>>>>> stream?
>>>>>>>>
>>>>>>>> My personal take is that merge() should not be static and not take
>>>>>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>>>>>>>
>>>>>>>> // old
>>>>>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>>>>>>>> // new
>>>>>>>> KStream merge = stream1.merge(stream2);
>>>>>>>>
>>>>>>>>
>>>>>>>> Having pointed out the second pattern, it should actually be fine
>> to
>>> get
>>>>>>>> rid of varargs in merger() at all, as users could chain multiple
>>> calls
>>>>>>>> to merge() after each other:
>>>>>>>>
>>>>>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
>>>>>>>>> Hi,
>>>>>>>>> Please take a look at:
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 


Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Damian Guy <da...@gmail.com>.
Hi Richard,

Thanks for the KIP. Looks good, just one thing: we don't need to deprecate
StreamBuilder#merge as it has been added during this release cycle. It can
just be removed.

Thanks,
Damian

On Mon, 18 Sep 2017 at 23:22 Richard Yu <yo...@gmail.com> wrote:

> The discussion should not stay idle. Since this issue is so small, we
> should move it into the voting phase.
>
> On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Thanks for updating the KIP.
> >
> > You are of course right, that we internally need access to
> > InternalStreamBuilder, but that should not be too hard and effectively
> > be an internal implementation detail.
> >
> >
> > Two more comments:
> >
> > the new method should be
> >
> > > KStream<K,V> merge(KStream<K,V> stream);
> >
> > and not
> >
> > > <K,V> KStream<K,V> merge(KStream<K,V> streams);
> >
> > as in the KIP? The prefix `<K,V>` is not required for non-static methods
> > and it should be singular (not plural) as parameter name?
> >
> > Can you also add an explicit sentence, that the new method does not use
> > varargs anymore but a single KStream parameter (in contrast to the old
> > method). And mention that this is no limitation as calls to new merge()
> > can be chained.
> >
> >
> >
> > Thanks a lot!
> >
> > -Matthias
> >
> >
> >
> > On 9/17/17 10:32 AM, Richard Yu wrote:
> > > Correction: When the current merge() method is called with multiple
> > > streams, a warning will be printed (or logged), but this should not
> > hinder
> > > ability to read the log.
> > > There is a missing unchecked warning suppression for the old method.
> > > However, it is not high priority due to deprecation of the old merge()
> > > method.
> > >
> > >
> > > On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <
> yohan.richard.yu@gmail.com>
> > > wrote:
> > >
> > >> With regards to Xavier's comment, this practice I do no think applies
> to
> > >> this PR. There is not much potential here for warnings to be thrown.
> > Note
> > >> that in StreamsBuilder's merge, their is no
> > @SuppressWarnings("unchecked")--indicating
> > >> that warnings is sparse, if not nonexistent.
> > >>
> > >>
> > >> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <
> yohan.richard.yu@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> KIP-202 has been changed according to the conditions of your
> > suggestion.
> > >>>
> > >>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <
> > yohan.richard.yu@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> I added StreamsBuilder under the assumption that
> InternalStreamBuilder
> > >>>> would be required to merge
> > >>>> two streams. However, if that is not the case, then I would still
> > need a
> > >>>> couple of things:
> > >>>>
> > >>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
> > >>>>
> > >>>> 2) The merge_name that the merged streams will be given
> > >>>>
> > >>>> 3) Need access to the corresponding InternalStreamBuilder's
> > >>>> InternalTopologyBuilder to add a processor (for the new KStreams)
> > >>>>
> > >>>> All these parameters are associated with InternalStreamsBuilder,
> thus
> > it
> > >>>> is essential towards merging the streams.
> > >>>> We are left with three options (taking into account the restriction
> > that
> > >>>> InternalStreamsBuilder's reference scope is mostly limited to within
> > the
> > >>>> org.apache.kafka.streams.kstream.internals package):
> > >>>>
> > >>>> a) Find a way to pass InternalStreamsBuilder indirectly into the
> > class.
> > >>>> (using StreamsBuilder)
> > >>>>
> > >>>> b) Find the matching InternalStreamBuilder within the method that
> > >>>> corresponds to the streams about to be merged.
> > >>>>
> > >>>> or c) Use the local InternalStreamsBuilder inherited from
> > >>>> AbstractStream, assuming that it is the correct builder
> > >>>>
> > >>>> From your suggestion, that would mean using the c option I mentioned
> > >>>> earlier. This choice of implementation works, but it could also
> > include the
> > >>>> risk that the local InternalStreamsBuilder might not be the correct
> > one
> > >>>> (just something one might want to keep in mind, since I will change
> > it)
> > >>>>
> > >>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <
> > matthias@confluent.io
> > >>>>> wrote:
> > >>>>
> > >>>>> Hi Richard,
> > >>>>>
> > >>>>> Thanks a lot for the KIP!
> > >>>>>
> > >>>>> I have three question:
> > >>>>>  - why is the new merge() method static?
> > >>>>>  - why does the new merge() method take StreamsBuilder as a
> > parameter?
> > >>>>>  - did you think about Xavier's comment (see the JIRA in case you
> did
> > >>>>> not notice it yet) about varargs vs adding some overloads to merge
> > >>>>> stream?
> > >>>>>
> > >>>>> My personal take is that merge() should not be static and not take
> > >>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
> > >>>>>
> > >>>>> // old
> > >>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
> > >>>>> // new
> > >>>>> KStream merge = stream1.merge(stream2);
> > >>>>>
> > >>>>>
> > >>>>> Having pointed out the second pattern, it should actually be fine
> to
> > get
> > >>>>> rid of varargs in merger() at all, as users could chain multiple
> > calls
> > >>>>> to merge() after each other:
> > >>>>>
> > >>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
> > >>>>>> Hi,
> > >>>>>> Please take a look at:
> > >>>>>>
> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> > >>>>>>
> > >>>>>> Thanks
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
The discussion should not stay idle. Since this issue is so small, we
should move it into the voting phase.

On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for updating the KIP.
>
> You are of course right, that we internally need access to
> InternalStreamBuilder, but that should not be too hard and effectively
> be an internal implementation detail.
>
>
> Two more comments:
>
> the new method should be
>
> > KStream<K,V> merge(KStream<K,V> stream);
>
> and not
>
> > <K,V> KStream<K,V> merge(KStream<K,V> streams);
>
> as in the KIP? The prefix `<K,V>` is not required for non-static methods
> and it should be singular (not plural) as parameter name?
>
> Can you also add an explicit sentence, that the new method does not use
> varargs anymore but a single KStream parameter (in contrast to the old
> method). And mention that this is no limitation as calls to new merge()
> can be chained.
>
>
>
> Thanks a lot!
>
> -Matthias
>
>
>
> On 9/17/17 10:32 AM, Richard Yu wrote:
> > Correction: When the current merge() method is called with multiple
> > streams, a warning will be printed (or logged), but this should not
> hinder
> > ability to read the log.
> > There is a missing unchecked warning suppression for the old method.
> > However, it is not high priority due to deprecation of the old merge()
> > method.
> >
> >
> > On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <yo...@gmail.com>
> > wrote:
> >
> >> With regards to Xavier's comment, this practice I do no think applies to
> >> this PR. There is not much potential here for warnings to be thrown.
> Note
> >> that in StreamsBuilder's merge, their is no
> @SuppressWarnings("unchecked")--indicating
> >> that warnings is sparse, if not nonexistent.
> >>
> >>
> >> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <yohan.richard.yu@gmail.com
> >
> >> wrote:
> >>
> >>> KIP-202 has been changed according to the conditions of your
> suggestion.
> >>>
> >>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <
> yohan.richard.yu@gmail.com>
> >>> wrote:
> >>>
> >>>> I added StreamsBuilder under the assumption that InternalStreamBuilder
> >>>> would be required to merge
> >>>> two streams. However, if that is not the case, then I would still
> need a
> >>>> couple of things:
> >>>>
> >>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
> >>>>
> >>>> 2) The merge_name that the merged streams will be given
> >>>>
> >>>> 3) Need access to the corresponding InternalStreamBuilder's
> >>>> InternalTopologyBuilder to add a processor (for the new KStreams)
> >>>>
> >>>> All these parameters are associated with InternalStreamsBuilder, thus
> it
> >>>> is essential towards merging the streams.
> >>>> We are left with three options (taking into account the restriction
> that
> >>>> InternalStreamsBuilder's reference scope is mostly limited to within
> the
> >>>> org.apache.kafka.streams.kstream.internals package):
> >>>>
> >>>> a) Find a way to pass InternalStreamsBuilder indirectly into the
> class.
> >>>> (using StreamsBuilder)
> >>>>
> >>>> b) Find the matching InternalStreamBuilder within the method that
> >>>> corresponds to the streams about to be merged.
> >>>>
> >>>> or c) Use the local InternalStreamsBuilder inherited from
> >>>> AbstractStream, assuming that it is the correct builder
> >>>>
> >>>> From your suggestion, that would mean using the c option I mentioned
> >>>> earlier. This choice of implementation works, but it could also
> include the
> >>>> risk that the local InternalStreamsBuilder might not be the correct
> one
> >>>> (just something one might want to keep in mind, since I will change
> it)
> >>>>
> >>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>>> wrote:
> >>>>
> >>>>> Hi Richard,
> >>>>>
> >>>>> Thanks a lot for the KIP!
> >>>>>
> >>>>> I have three question:
> >>>>>  - why is the new merge() method static?
> >>>>>  - why does the new merge() method take StreamsBuilder as a
> parameter?
> >>>>>  - did you think about Xavier's comment (see the JIRA in case you did
> >>>>> not notice it yet) about varargs vs adding some overloads to merge
> >>>>> stream?
> >>>>>
> >>>>> My personal take is that merge() should not be static and not take
> >>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
> >>>>>
> >>>>> // old
> >>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
> >>>>> // new
> >>>>> KStream merge = stream1.merge(stream2);
> >>>>>
> >>>>>
> >>>>> Having pointed out the second pattern, it should actually be fine to
> get
> >>>>> rid of varargs in merger() at all, as users could chain multiple
> calls
> >>>>> to merge() after each other:
> >>>>>
> >>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
> >>>>>> Hi,
> >>>>>> Please take a look at:
> >>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for updating the KIP.

You are of course right, that we internally need access to
InternalStreamBuilder, but that should not be too hard and effectively
be an internal implementation detail.


Two more comments:

the new method should be

> KStream<K,V> merge(KStream<K,V> stream);

and not

> <K,V> KStream<K,V> merge(KStream<K,V> streams);

as in the KIP? The prefix `<K,V>` is not required for non-static methods
and it should be singular (not plural) as parameter name?

Can you also add an explicit sentence, that the new method does not use
varargs anymore but a single KStream parameter (in contrast to the old
method). And mention that this is no limitation as calls to new merge()
can be chained.



Thanks a lot!

-Matthias



On 9/17/17 10:32 AM, Richard Yu wrote:
> Correction: When the current merge() method is called with multiple
> streams, a warning will be printed (or logged), but this should not hinder
> ability to read the log.
> There is a missing unchecked warning suppression for the old method.
> However, it is not high priority due to deprecation of the old merge()
> method.
> 
> 
> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <yo...@gmail.com>
> wrote:
> 
>> With regards to Xavier's comment, this practice I do no think applies to
>> this PR. There is not much potential here for warnings to be thrown. Note
>> that in StreamsBuilder's merge, their is no @SuppressWarnings("unchecked")--indicating
>> that warnings is sparse, if not nonexistent.
>>
>>
>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <yo...@gmail.com>
>> wrote:
>>
>>> KIP-202 has been changed according to the conditions of your suggestion.
>>>
>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <yo...@gmail.com>
>>> wrote:
>>>
>>>> I added StreamsBuilder under the assumption that InternalStreamBuilder
>>>> would be required to merge
>>>> two streams. However, if that is not the case, then I would still need a
>>>> couple of things:
>>>>
>>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
>>>>
>>>> 2) The merge_name that the merged streams will be given
>>>>
>>>> 3) Need access to the corresponding InternalStreamBuilder's
>>>> InternalTopologyBuilder to add a processor (for the new KStreams)
>>>>
>>>> All these parameters are associated with InternalStreamsBuilder, thus it
>>>> is essential towards merging the streams.
>>>> We are left with three options (taking into account the restriction that
>>>> InternalStreamsBuilder's reference scope is mostly limited to within the
>>>> org.apache.kafka.streams.kstream.internals package):
>>>>
>>>> a) Find a way to pass InternalStreamsBuilder indirectly into the class.
>>>> (using StreamsBuilder)
>>>>
>>>> b) Find the matching InternalStreamBuilder within the method that
>>>> corresponds to the streams about to be merged.
>>>>
>>>> or c) Use the local InternalStreamsBuilder inherited from
>>>> AbstractStream, assuming that it is the correct builder
>>>>
>>>> From your suggestion, that would mean using the c option I mentioned
>>>> earlier. This choice of implementation works, but it could also include the
>>>> risk that the local InternalStreamsBuilder might not be the correct one
>>>> (just something one might want to keep in mind, since I will change it)
>>>>
>>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <matthias@confluent.io
>>>>> wrote:
>>>>
>>>>> Hi Richard,
>>>>>
>>>>> Thanks a lot for the KIP!
>>>>>
>>>>> I have three question:
>>>>>  - why is the new merge() method static?
>>>>>  - why does the new merge() method take StreamsBuilder as a parameter?
>>>>>  - did you think about Xavier's comment (see the JIRA in case you did
>>>>> not notice it yet) about varargs vs adding some overloads to merge
>>>>> stream?
>>>>>
>>>>> My personal take is that merge() should not be static and not take
>>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>>>>
>>>>> // old
>>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>>>>> // new
>>>>> KStream merge = stream1.merge(stream2);
>>>>>
>>>>>
>>>>> Having pointed out the second pattern, it should actually be fine to get
>>>>> rid of varargs in merger() at all, as users could chain multiple calls
>>>>> to merge() after each other:
>>>>>
>>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
>>>>>> Hi,
>>>>>> Please take a look at:
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 


Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
Correction: When the current merge() method is called with multiple
streams, a warning will be printed (or logged), but this should not hinder
ability to read the log.
There is a missing unchecked warning suppression for the old method.
However, it is not high priority due to deprecation of the old merge()
method.


On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <yo...@gmail.com>
wrote:

> With regards to Xavier's comment, this practice I do no think applies to
> this PR. There is not much potential here for warnings to be thrown. Note
> that in StreamsBuilder's merge, their is no @SuppressWarnings("unchecked")--indicating
> that warnings is sparse, if not nonexistent.
>
>
> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <yo...@gmail.com>
> wrote:
>
>> KIP-202 has been changed according to the conditions of your suggestion.
>>
>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <yo...@gmail.com>
>> wrote:
>>
>>> I added StreamsBuilder under the assumption that InternalStreamBuilder
>>> would be required to merge
>>> two streams. However, if that is not the case, then I would still need a
>>> couple of things:
>>>
>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
>>>
>>> 2) The merge_name that the merged streams will be given
>>>
>>> 3) Need access to the corresponding InternalStreamBuilder's
>>> InternalTopologyBuilder to add a processor (for the new KStreams)
>>>
>>> All these parameters are associated with InternalStreamsBuilder, thus it
>>> is essential towards merging the streams.
>>> We are left with three options (taking into account the restriction that
>>> InternalStreamsBuilder's reference scope is mostly limited to within the
>>> org.apache.kafka.streams.kstream.internals package):
>>>
>>> a) Find a way to pass InternalStreamsBuilder indirectly into the class.
>>> (using StreamsBuilder)
>>>
>>> b) Find the matching InternalStreamBuilder within the method that
>>> corresponds to the streams about to be merged.
>>>
>>> or c) Use the local InternalStreamsBuilder inherited from
>>> AbstractStream, assuming that it is the correct builder
>>>
>>> From your suggestion, that would mean using the c option I mentioned
>>> earlier. This choice of implementation works, but it could also include the
>>> risk that the local InternalStreamsBuilder might not be the correct one
>>> (just something one might want to keep in mind, since I will change it)
>>>
>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <matthias@confluent.io
>>> > wrote:
>>>
>>>> Hi Richard,
>>>>
>>>> Thanks a lot for the KIP!
>>>>
>>>> I have three question:
>>>>  - why is the new merge() method static?
>>>>  - why does the new merge() method take StreamsBuilder as a parameter?
>>>>  - did you think about Xavier's comment (see the JIRA in case you did
>>>> not notice it yet) about varargs vs adding some overloads to merge
>>>> stream?
>>>>
>>>> My personal take is that merge() should not be static and not take
>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>>>
>>>> // old
>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>>>> // new
>>>> KStream merge = stream1.merge(stream2);
>>>>
>>>>
>>>> Having pointed out the second pattern, it should actually be fine to get
>>>> rid of varargs in merger() at all, as users could chain multiple calls
>>>> to merge() after each other:
>>>>
>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>>>
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
>>>> > Hi,
>>>> > Please take a look at:
>>>> >
>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>>>> >
>>>> > Thanks
>>>> >
>>>>
>>>>
>>>
>>
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
With regards to Xavier's comment, this practice I do no think applies to
this PR. There is not much potential here for warnings to be thrown. Note
that in StreamsBuilder's merge, their is no
@SuppressWarnings("unchecked")--indicating that warnings is sparse, if not
nonexistent.


On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <yo...@gmail.com>
wrote:

> KIP-202 has been changed according to the conditions of your suggestion.
>
> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <yo...@gmail.com>
> wrote:
>
>> I added StreamsBuilder under the assumption that InternalStreamBuilder
>> would be required to merge
>> two streams. However, if that is not the case, then I would still need a
>> couple of things:
>>
>> 1) An InternalStreamBuilder instance to instantiate a new KStream
>>
>> 2) The merge_name that the merged streams will be given
>>
>> 3) Need access to the corresponding InternalStreamBuilder's
>> InternalTopologyBuilder to add a processor (for the new KStreams)
>>
>> All these parameters are associated with InternalStreamsBuilder, thus it
>> is essential towards merging the streams.
>> We are left with three options (taking into account the restriction that
>> InternalStreamsBuilder's reference scope is mostly limited to within the
>> org.apache.kafka.streams.kstream.internals package):
>>
>> a) Find a way to pass InternalStreamsBuilder indirectly into the class.
>> (using StreamsBuilder)
>>
>> b) Find the matching InternalStreamBuilder within the method that
>> corresponds to the streams about to be merged.
>>
>> or c) Use the local InternalStreamsBuilder inherited from AbstractStream,
>> assuming that it is the correct builder
>>
>> From your suggestion, that would mean using the c option I mentioned
>> earlier. This choice of implementation works, but it could also include the
>> risk that the local InternalStreamsBuilder might not be the correct one
>> (just something one might want to keep in mind, since I will change it)
>>
>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Hi Richard,
>>>
>>> Thanks a lot for the KIP!
>>>
>>> I have three question:
>>>  - why is the new merge() method static?
>>>  - why does the new merge() method take StreamsBuilder as a parameter?
>>>  - did you think about Xavier's comment (see the JIRA in case you did
>>> not notice it yet) about varargs vs adding some overloads to merge
>>> stream?
>>>
>>> My personal take is that merge() should not be static and not take
>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>>
>>> // old
>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>>> // new
>>> KStream merge = stream1.merge(stream2);
>>>
>>>
>>> Having pointed out the second pattern, it should actually be fine to get
>>> rid of varargs in merger() at all, as users could chain multiple calls
>>> to merge() after each other:
>>>
>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>>
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 9/16/17 9:36 PM, Richard Yu wrote:
>>> > Hi,
>>> > Please take a look at:
>>> >
>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>>> >
>>> > Thanks
>>> >
>>>
>>>
>>
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
KIP-202 has been changed according to the conditions of your suggestion.

On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <yo...@gmail.com>
wrote:

> I added StreamsBuilder under the assumption that InternalStreamBuilder
> would be required to merge
> two streams. However, if that is not the case, then I would still need a
> couple of things:
>
> 1) An InternalStreamBuilder instance to instantiate a new KStream
>
> 2) The merge_name that the merged streams will be given
>
> 3) Need access to the corresponding InternalStreamBuilder's
> InternalTopologyBuilder to add a processor (for the new KStreams)
>
> All these parameters are associated with InternalStreamsBuilder, thus it
> is essential towards merging the streams.
> We are left with three options (taking into account the restriction that
> InternalStreamsBuilder's reference scope is mostly limited to within the
> org.apache.kafka.streams.kstream.internals package):
>
> a) Find a way to pass InternalStreamsBuilder indirectly into the class.
> (using StreamsBuilder)
>
> b) Find the matching InternalStreamBuilder within the method that
> corresponds to the streams about to be merged.
>
> or c) Use the local InternalStreamsBuilder inherited from AbstractStream,
> assuming that it is the correct builder
>
> From your suggestion, that would mean using the c option I mentioned
> earlier. This choice of implementation works, but it could also include the
> risk that the local InternalStreamsBuilder might not be the correct one
> (just something one might want to keep in mind, since I will change it)
>
> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Hi Richard,
>>
>> Thanks a lot for the KIP!
>>
>> I have three question:
>>  - why is the new merge() method static?
>>  - why does the new merge() method take StreamsBuilder as a parameter?
>>  - did you think about Xavier's comment (see the JIRA in case you did
>> not notice it yet) about varargs vs adding some overloads to merge stream?
>>
>> My personal take is that merge() should not be static and not take
>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>
>> // old
>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>> // new
>> KStream merge = stream1.merge(stream2);
>>
>>
>> Having pointed out the second pattern, it should actually be fine to get
>> rid of varargs in merger() at all, as users could chain multiple calls
>> to merge() after each other:
>>
>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>
>>
>>
>>
>> -Matthias
>>
>> On 9/16/17 9:36 PM, Richard Yu wrote:
>> > Hi,
>> > Please take a look at:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>> >
>> > Thanks
>> >
>>
>>
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by Richard Yu <yo...@gmail.com>.
I added StreamsBuilder under the assumption that InternalStreamBuilder
would be required to merge
two streams. However, if that is not the case, then I would still need a
couple of things:

1) An InternalStreamBuilder instance to instantiate a new KStream

2) The merge_name that the merged streams will be given

3) Need access to the corresponding InternalStreamBuilder's
InternalTopologyBuilder to add a processor (for the new KStreams)

All these parameters are associated with InternalStreamsBuilder, thus it is
essential towards merging the streams.
We are left with three options (taking into account the restriction that
InternalStreamsBuilder's reference scope is mostly limited to within
the org.apache.kafka.streams.kstream.internals
package):

a) Find a way to pass InternalStreamsBuilder indirectly into the class.
(using StreamsBuilder)

b) Find the matching InternalStreamBuilder within the method that
corresponds to the streams about to be merged.

or c) Use the local InternalStreamsBuilder inherited from AbstractStream,
assuming that it is the correct builder

From your suggestion, that would mean using the c option I mentioned
earlier. This choice of implementation works, but it could also include the
risk that the local InternalStreamsBuilder might not be the correct one
(just something one might want to keep in mind, since I will change it)

On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi Richard,
>
> Thanks a lot for the KIP!
>
> I have three question:
>  - why is the new merge() method static?
>  - why does the new merge() method take StreamsBuilder as a parameter?
>  - did you think about Xavier's comment (see the JIRA in case you did
> not notice it yet) about varargs vs adding some overloads to merge stream?
>
> My personal take is that merge() should not be static and not take
> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>
> // old
> KStream merged = StreamsBuilder.merge(stream1, stream2);
> // new
> KStream merge = stream1.merge(stream2);
>
>
> Having pointed out the second pattern, it should actually be fine to get
> rid of varargs in merger() at all, as users could chain multiple calls
> to merge() after each other:
>
> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>
>
>
>
> -Matthias
>
> On 9/16/17 9:36 PM, Richard Yu wrote:
> > Hi,
> > Please take a look at:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> >
> > Thanks
> >
>
>

Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Richard,

Thanks a lot for the KIP!

I have three question:
 - why is the new merge() method static?
 - why does the new merge() method take StreamsBuilder as a parameter?
 - did you think about Xavier's comment (see the JIRA in case you did
not notice it yet) about varargs vs adding some overloads to merge stream?

My personal take is that merge() should not be static and not take
StreamsBuilder. The idea of the JIRA was to get a more natural API:

// old
KStream merged = StreamsBuilder.merge(stream1, stream2);
// new
KStream merge = stream1.merge(stream2);


Having pointed out the second pattern, it should actually be fine to get
rid of varargs in merger() at all, as users could chain multiple calls
to merge() after each other:

KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);




-Matthias

On 9/16/17 9:36 PM, Richard Yu wrote:
> Hi,
> Please take a look at:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> 
> Thanks
>