You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <mj...@apache.org> on 2022/04/22 23:42:18 UTC

Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

Nick, how should we proceed?

On 3/29/22 9:21 PM, Chris Egerton wrote:
> Hi all,
> 
> The Named-first variant seems pretty appealing:
>      merge(Named named, KStream... streams)
> It doesn't align with the existing merge methods, but it does at least
> follow precedent set by the (now-deprecated) branch method [1].
> 
> A Collection-based alternative seems slightly less appealing, but only
> because I'm guessing it'll be more common for the set of to-be-merged
> streams to be known at compile time. In that case, the syntactic sugar
> provided by a variadic method is preferable to having to wrap your set of
> streams in a call to, e.g., Arrays::asList [2].
> 
> An issue I have with the split variant:
>      merge(KStream first, Named named, KStream... rest)
> is that it doesn't seem very intuitive to users who aren't familiar with
> Streams and don't have the context of how/when the overloaded variants to
> the merge method were introduced.
> 
> If we really want things to be consistent, one possibility is to add a
> Named-first variant:
>      merge(Named named, KStream... streams)
> Deprecate the existing Named variant:
>      merge(KStream stream, Named named)
> And change the existing single-arg merge method:
>      merge(KStream stream)
> To become variadic (like proposed by Matthias earlier in the thread):
>      merge(KStream... streams)
> 
> In the end, we'd have three methods, possibly reduced to two by the next
> major release:
>      merge(Named named, KStream... streams)
>      merge(KStream... streams)
>      merge(KStream stream, Named named) (deprecated)
> 
> RE supporting both a Collection-based and a variadic method: it doesn't
> look like this is too common in the Streams API right now and considering
> how trivial it is to convert from one style to another in most cases
> (either with the aforementioned Arrays::asList to turn a static
> compile-time set of streams into a Collection, or with Collection::toArray
> [3] to turn a runtime Collection of streams into an array which can be
> passed to a variadic method), it doesn't seem worth it to pollute the API
> space with multiple methods that provide the exact same behavior.
> 
> [1] -
> https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Predicate..
> .)
> [2] -
> https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#asList-T...-
> [3] -
> https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html#toArray-T:A-
> 
> Cheers,
> 
> Chris
> 
> On Tue, Mar 29, 2022 at 11:14 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> My understand was, that the original proposal was to have:
>>
>>     merge(KStream stream);
>>     merge(KStream... streams);
>>
>> Maybe I misunderstood.
>>
>>
>> I am not really a fan of
>>
>>     merge(KStream stream, KStream... streams);
>>
>> because it seem to break the `Collection` pattern. If I have a
>> collection of KStreams, I need to artificially extract one and pass as
>> first argument and pass all others using the second argument.
>>
>> On the other hand, _if_ I have a collection, going back to the original
>> proposal of
>>
>>     merge(Collection<KStream> streams);
>>
>> would work, too. Maybe bottom line is, that we might want to have both
>> (`Collection` and vararg) to optimize for both cases? On the other hand
>> it feels rather redundant? Also not sure if both are compatible?
>>
>>
>>
>> The issue with Named is interesting. Wondering if we should just flip
>> the argument order:
>>
>>     merge(Named name, KStream... streams);
>>
>> Falling back to `Collection` would also avoid this issue.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/29/22 1:43 AM, Nick Telford wrote:
>>> Yeah, the Named parameter makes it a little trickier. My suggestion would
>>> be to add an additional overload that looks like:
>>>
>>> KStream<K, V> merged(KStream<K, V> first, Named named, KStream<K, V>
>> rest);
>>>
>>> It's not ideal having the Named parameter split the other parameters; we
>>> could alternatively move the Named parameter to be first, but then that
>>> wouldn't align with the rest of the API.
>>>
>>> Nick
>>>
>>> On Tue, 29 Mar 2022 at 05:20, Chris Egerton <fe...@gmail.com>
>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Java permits the overload. Simple test class to demonstrate:
>>>>
>>>> ```
>>>> public class Test {
>>>>       private final String field;
>>>>
>>>>       public Test(String field) {
>>>>           this.field = field;
>>>>       }
>>>>
>>>>       public Test merge(Test that) {
>>>>           return new Test("Single-arg merge: " + this.field + ", " +
>>>> that.field);
>>>>       }
>>>>
>>>>       public Test merge(Test that, Test... those) {
>>>>           String newField = "Varargs merge: " + this.field + ", " +
>>>> that.field;
>>>>           for (Test test : those) newField += ", " + test.field;
>>>>           return new Test(newField);
>>>>       }
>>>>
>>>>       public static void main(String[] args) {
>>>>           Test t1 = new Test("t1"), t2 = new Test("t2"), t3 = new
>> Test("t3");
>>>>           Test merge1 = t1.merge(t2), merge2 = t1.merge(t2, t3);
>>>>           System.out.println(merge1.field); // Single-arg merge: t1, t2
>>>>           System.out.println(merge2.field); // Varargs merge: t1, t2, t3
>>>>       }
>>>> }
>>>> ```
>>>>
>>>> There's a great StackOverflow writeup on the subject [1], which explains
>>>> that during method resolution, priority is given to methods whose
>>>> signatures match the argument list without taking boxing/unboxing or
>>>> varargs into consideration:
>>>>
>>>>> The first phase performs overload resolution without permitting boxing
>> or
>>>> unboxing conversion, or the use of variable arity method invocation. If
>> no
>>>> applicable method is found during this phase then processing continues
>> to
>>>> the second phase.
>>>>> The second phase performs overload resolution while allowing boxing and
>>>> unboxing, but still precludes the use of variable arity method
>> invocation.
>>>> If no applicable method is found during this phase then processing
>>>> continues to the third phase.
>>>>> The third phase allows overloading to be combined with variable arity
>>>> methods, boxing, and unboxing.
>>>>
>>>> I'm curious if it's worth keeping a variant that accepts a Named
>> parameter?
>>>> Might be tricky to accommodate since variadic arguments have to be last.
>>>>
>>>> [1] - https://stackoverflow.com/a/48850722
>>>>
>>>> Cheers,
>>>>
>>>> Chris
>>>>
>>>> On Mon, Mar 28, 2022 at 11:46 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>
>>>>> I think Java does not allow to have both overloads, because it would
>>>>> result in ambiguity?
>>>>>
>>>>> If you call `s1.merge(s2)` it's unclear which method you want to call.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 3/28/22 7:20 AM, Nick Telford wrote:
>>>>>> Hi Matthias,
>>>>>>
>>>>>> How about instead of changing the signature of the existing method to
>>>>>> variadic, we simply add a new overload which takes variadic args:
>>>>>>
>>>>>> KStream<K, V> merge(KStream<K, V> first, KStream<K, V>... rest);
>>>>>>
>>>>>> That way, we maintain both source *and* binary compatibility for the
>>>>>> existing method, and we can enforce that there is always at least one
>>>>>> stream (argument) being merged.
>>>>>>
>>>>>> I'm fine dropping the static methods. As you said, this is mostly all
>>>>> just
>>>>>> syntax sugar anyway, but I do think allowing multiple streams to be
>>>>> merged
>>>>>> together is a benefit. My motivation was that we generate diagrams for
>>>>> our
>>>>>> Topologies, and having several binary merges becomes quite messy when
>> a
>>>>>> single n-ary merge is what you're really modelling.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>> On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>
>>>>>>> Thanks for proposing this KIP.
>>>>>>>
>>>>>>> I feel a little bit torn by the idea. In general, we try to keep the
>>>>>>> surface area small, and only add APIs that delivery (significant)
>>>> value.
>>>>>>>
>>>>>>> It seems the current proposal is more or less about syntactic sugar,
>>>>>>> what can still be valuable, but I am not really sure about it.
>>>>>>>
>>>>>>> I am also wondering, if we could use a variadic argument instead of a
>>>>>>> `Collection`:
>>>>>>>
>>>>>>>         KStream<K, V> merge(KStream<K, V>... streams);
>>>>>>>
>>>>>>> This way, we could just replace the existing method in a backward
>>>>>>> compatible way (well, source code compatible only) and thus not
>>>> increase
>>>>>>> the surface area of the API while still achieving your goal?
>>>>>>>
>>>>>>> A `merge()` with zero argument would just be a no-op (same as for
>>>> using
>>>>>>> `Collection` I assume?).
>>>>>>>
>>>>>>>
>>>>>>> For adding the static methods: It seems not to be a common pattern to
>>>>>>> me? I might be better not to add them and leave it to users to write
>> a
>>>>>>> small helper method themselves if they have such a pattern?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 1/31/22 7:35 AM, Nick Telford wrote:
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I'd like to discuss KIP 819:
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation
>>>>>>>>
>>>>>>>> This is a simple KIP that adds/modifies the KStream#merge API to
>>>> enable
>>>>>>>> many streams to be merged in a single graph node.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Nick Telford
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>