You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2019/12/05 06:07:57 UTC

Case of joining multiple streams/tables

Hi,
I have checked the documentation and what I see that we can join two
streams or tables at a given time.

I have a case where I have multiple streams which I need to join based on
common key.

As of now I am first joining two and the result of that with next and so on.

Is there a way or any case implemented anywhere that joins multiple
streams/tables in a single operation.

If not then is this something that is pipelined for future releases?
Or does something like this make sense to be part of streams functionality?

Thanks
Sachin

Re: Case of joining multiple streams/tables

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Sachin,

As Patrik mentioned, KIP-150 is being actively worked on and is likely to
be included in the next release.


Guozhang

On Fri, Dec 6, 2019 at 12:09 AM Patrik Kleindl <pk...@gmail.com> wrote:

> Hi
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> might
> be worth a look.
> best regards
> Patrik
>
> On Fri, 6 Dec 2019 at 06:44, Sachin Mittal <sj...@gmail.com> wrote:
>
> > I was thinking more of a builder api at DSL level.
> > Something like this:
> > StreamsBuilder.joineBuilder()
> >                 .join(kstream1)
> >                 .leftJoin(kstream2)
> >                 .leftJoin(kstream3)
> >                 ....
> >                 .joiner((k,v1,v2,v3...) -> ...)
> >                 .window()
> >                 .build();
> >
> >
> > So when we require to join multiple streams, it can be done using one
> > joiner.
> > Internally this can be implemented at the processor level.
> >
> > So not so much of just adding another API call, but something that can
> make
> > code cleaner and more efficient, by using single joiner.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 5, 2019 at 2:22 PM Bruno Cadonna <br...@confluent.io> wrote:
> >
> > > Hi Sachin,
> > >
> > > I do not completely understand what you mean with one single
> > > operation. Do you mean one call of a method in the DSL or the join is
> > > processed on one processor node?
> > >
> > > If you mean the latter, the joins in the DSL are also not processed on
> > > one single processor node.
> > >
> > > If you mean the former, the DSL does not have a single method call to
> > > join multiple streams and it does not necessarily need it to process
> > > an n-way join more efficiently, because the DSL is just the way you
> > > declare the join. How the join is processed depends on how the
> > > topology is build from the DSL code. Having a DSL call specific for a
> > > n-way join would merely result in syntactic sugar (which can also make
> > > sense).
> > >
> > > If you have specific requirements that are not fulfilled by the DSL
> > > you can use the Processor API to implement your own join.
> > >
> > > See the following StackOverflow question for more details on joins.
> > >
> > >
> >
> https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api
> > >
> > > Best,
> > > Bruno
> > >
> > > On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal <sj...@gmail.com>
> wrote:
> > > >
> > > > Hi,
> > > > I have checked the documentation and what I see that we can join two
> > > > streams or tables at a given time.
> > > >
> > > > I have a case where I have multiple streams which I need to join
> based
> > on
> > > > common key.
> > > >
> > > > As of now I am first joining two and the result of that with next and
> > so
> > > on.
> > > >
> > > > Is there a way or any case implemented anywhere that joins multiple
> > > > streams/tables in a single operation.
> > > >
> > > > If not then is this something that is pipelined for future releases?
> > > > Or does something like this make sense to be part of streams
> > > functionality?
> > > >
> > > > Thanks
> > > > Sachin
> > >
> >
>


-- 
-- Guozhang

Re: Case of joining multiple streams/tables

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
might
be worth a look.
best regards
Patrik

On Fri, 6 Dec 2019 at 06:44, Sachin Mittal <sj...@gmail.com> wrote:

> I was thinking more of a builder api at DSL level.
> Something like this:
> StreamsBuilder.joineBuilder()
>                 .join(kstream1)
>                 .leftJoin(kstream2)
>                 .leftJoin(kstream3)
>                 ....
>                 .joiner((k,v1,v2,v3...) -> ...)
>                 .window()
>                 .build();
>
>
> So when we require to join multiple streams, it can be done using one
> joiner.
> Internally this can be implemented at the processor level.
>
> So not so much of just adding another API call, but something that can make
> code cleaner and more efficient, by using single joiner.
>
> Thanks
> Sachin
>
>
>
> On Thu, Dec 5, 2019 at 2:22 PM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Sachin,
> >
> > I do not completely understand what you mean with one single
> > operation. Do you mean one call of a method in the DSL or the join is
> > processed on one processor node?
> >
> > If you mean the latter, the joins in the DSL are also not processed on
> > one single processor node.
> >
> > If you mean the former, the DSL does not have a single method call to
> > join multiple streams and it does not necessarily need it to process
> > an n-way join more efficiently, because the DSL is just the way you
> > declare the join. How the join is processed depends on how the
> > topology is build from the DSL code. Having a DSL call specific for a
> > n-way join would merely result in syntactic sugar (which can also make
> > sense).
> >
> > If you have specific requirements that are not fulfilled by the DSL
> > you can use the Processor API to implement your own join.
> >
> > See the following StackOverflow question for more details on joins.
> >
> >
> https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api
> >
> > Best,
> > Bruno
> >
> > On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > Hi,
> > > I have checked the documentation and what I see that we can join two
> > > streams or tables at a given time.
> > >
> > > I have a case where I have multiple streams which I need to join based
> on
> > > common key.
> > >
> > > As of now I am first joining two and the result of that with next and
> so
> > on.
> > >
> > > Is there a way or any case implemented anywhere that joins multiple
> > > streams/tables in a single operation.
> > >
> > > If not then is this something that is pipelined for future releases?
> > > Or does something like this make sense to be part of streams
> > functionality?
> > >
> > > Thanks
> > > Sachin
> >
>

Re: Case of joining multiple streams/tables

Posted by Sachin Mittal <sj...@gmail.com>.
I was thinking more of a builder api at DSL level.
Something like this:
StreamsBuilder.joineBuilder()
                .join(kstream1)
                .leftJoin(kstream2)
                .leftJoin(kstream3)
                ....
                .joiner((k,v1,v2,v3...) -> ...)
                .window()
                .build();


So when we require to join multiple streams, it can be done using one
joiner.
Internally this can be implemented at the processor level.

So not so much of just adding another API call, but something that can make
code cleaner and more efficient, by using single joiner.

Thanks
Sachin



On Thu, Dec 5, 2019 at 2:22 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Sachin,
>
> I do not completely understand what you mean with one single
> operation. Do you mean one call of a method in the DSL or the join is
> processed on one processor node?
>
> If you mean the latter, the joins in the DSL are also not processed on
> one single processor node.
>
> If you mean the former, the DSL does not have a single method call to
> join multiple streams and it does not necessarily need it to process
> an n-way join more efficiently, because the DSL is just the way you
> declare the join. How the join is processed depends on how the
> topology is build from the DSL code. Having a DSL call specific for a
> n-way join would merely result in syntactic sugar (which can also make
> sense).
>
> If you have specific requirements that are not fulfilled by the DSL
> you can use the Processor API to implement your own join.
>
> See the following StackOverflow question for more details on joins.
>
> https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api
>
> Best,
> Bruno
>
> On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > I have checked the documentation and what I see that we can join two
> > streams or tables at a given time.
> >
> > I have a case where I have multiple streams which I need to join based on
> > common key.
> >
> > As of now I am first joining two and the result of that with next and so
> on.
> >
> > Is there a way or any case implemented anywhere that joins multiple
> > streams/tables in a single operation.
> >
> > If not then is this something that is pipelined for future releases?
> > Or does something like this make sense to be part of streams
> functionality?
> >
> > Thanks
> > Sachin
>

Re: Case of joining multiple streams/tables

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Sachin,

I do not completely understand what you mean with one single
operation. Do you mean one call of a method in the DSL or the join is
processed on one processor node?

If you mean the latter, the joins in the DSL are also not processed on
one single processor node.

If you mean the former, the DSL does not have a single method call to
join multiple streams and it does not necessarily need it to process
an n-way join more efficiently, because the DSL is just the way you
declare the join. How the join is processed depends on how the
topology is build from the DSL code. Having a DSL call specific for a
n-way join would merely result in syntactic sugar (which can also make
sense).

If you have specific requirements that are not fulfilled by the DSL
you can use the Processor API to implement your own join.

See the following StackOverflow question for more details on joins.
https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api

Best,
Bruno

On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal <sj...@gmail.com> wrote:
>
> Hi,
> I have checked the documentation and what I see that we can join two
> streams or tables at a given time.
>
> I have a case where I have multiple streams which I need to join based on
> common key.
>
> As of now I am first joining two and the result of that with next and so on.
>
> Is there a way or any case implemented anywhere that joins multiple
> streams/tables in a single operation.
>
> If not then is this something that is pipelined for future releases?
> Or does something like this make sense to be part of streams functionality?
>
> Thanks
> Sachin