You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Caleb Welton <ca...@autonomic.ai> on 2016/08/25 23:51:55 UTC

Joining Streams with Kafka Streams

Hello,

I'm trying to understand best practices related to joining streams using
the Kafka Streams API.

I can configure the topology such that two sources feed into a single
processor:

topologyBuilder
    .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
    .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
    .addProcessor("hello-join", HelloJoin::new, "A", "B")...

And within my processor I can determine which topic a given message came
from:

public void process(String Key, String value) {
     if (context.topic.equals("a-topic") {
         ...
     } else {
         ...
     }

This allows for a crude form of cross stream join with the following
issues/limitations:

  i.  A string compare on topic name to decide which stream a message came
from.  Having actual access to the TopicPartition could lead to more
efficient validation.  Priority low, as this is just a small performance
hit, but it is a per message performance hit so would be nice to eliminate.

  ii. This requires "a-topic" and "b-topic" to have the same message
format, which for general join handling is a pretty big limitation.  What
would be the recommended way to handle the case of different message
formats, e.g. needing different deserializers for different input topics?

E.g. how would I define my Processor if the topology was:

topologyBuilder
    .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
    .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
    .addProcessor("hello-join", HelloJoin::new, "A", "B")...

where itemADeserializer and itemBDeserializer return different classes?

Thanks,
  Caleb

Re: Joining Streams with Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
I would suggest you trying the high-level Streams DSL as Michael mentioned,
it implements this kind of joins in a slight different way (i.e. as two
join processors against each other's materialized view, followed by a merge
processor node), and hence the above two issues you mentioned are
automatically resolved.


Guozhang

On Fri, Aug 26, 2016 at 2:47 AM, Michael Noll <mi...@confluent.io> wrote:

> First a follow-up question, just in case (sorry if that was obvious to you
> already):  Have you considered using Kafka Streams DSL, which has much more
> convenient join functionality built-in out of the box?  The reason I am
> asking is that you didn't specifically mention that you did try using the
> DSL and/or you need to use the Processor API because of other reasons as
> well.
>
> -Michael
>
>
> On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote:
>
> > Hello,
> >
> > I'm trying to understand best practices related to joining streams using
> > the Kafka Streams API.
> >
> > I can configure the topology such that two sources feed into a single
> > processor:
> >
> > topologyBuilder
> >     .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
> >     .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
> >     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
> >
> > And within my processor I can determine which topic a given message came
> > from:
> >
> > public void process(String Key, String value) {
> >      if (context.topic.equals("a-topic") {
> >          ...
> >      } else {
> >          ...
> >      }
> >
> > This allows for a crude form of cross stream join with the following
> > issues/limitations:
> >
> >   i.  A string compare on topic name to decide which stream a message
> came
> > from.  Having actual access to the TopicPartition could lead to more
> > efficient validation.  Priority low, as this is just a small performance
> > hit, but it is a per message performance hit so would be nice to
> eliminate.
> >
> >   ii. This requires "a-topic" and "b-topic" to have the same message
> > format, which for general join handling is a pretty big limitation.  What
> > would be the recommended way to handle the case of different message
> > formats, e.g. needing different deserializers for different input topics?
> >
> > E.g. how would I define my Processor if the topology was:
> >
> > topologyBuilder
> >     .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
> >     .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
> >     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
> >
> > where itemADeserializer and itemBDeserializer return different classes?
> >
> > Thanks,
> >   Caleb
> >
>



-- 
-- Guozhang

Re: Joining Streams with Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
First a follow-up question, just in case (sorry if that was obvious to you
already):  Have you considered using Kafka Streams DSL, which has much more
convenient join functionality built-in out of the box?  The reason I am
asking is that you didn't specifically mention that you did try using the
DSL and/or you need to use the Processor API because of other reasons as
well.

-Michael


On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote:

> Hello,
>
> I'm trying to understand best practices related to joining streams using
> the Kafka Streams API.
>
> I can configure the topology such that two sources feed into a single
> processor:
>
> topologyBuilder
>     .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
>     .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
>     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> And within my processor I can determine which topic a given message came
> from:
>
> public void process(String Key, String value) {
>      if (context.topic.equals("a-topic") {
>          ...
>      } else {
>          ...
>      }
>
> This allows for a crude form of cross stream join with the following
> issues/limitations:
>
>   i.  A string compare on topic name to decide which stream a message came
> from.  Having actual access to the TopicPartition could lead to more
> efficient validation.  Priority low, as this is just a small performance
> hit, but it is a per message performance hit so would be nice to eliminate.
>
>   ii. This requires "a-topic" and "b-topic" to have the same message
> format, which for general join handling is a pretty big limitation.  What
> would be the recommended way to handle the case of different message
> formats, e.g. needing different deserializers for different input topics?
>
> E.g. how would I define my Processor if the topology was:
>
> topologyBuilder
>     .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
>     .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
>     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> where itemADeserializer and itemBDeserializer return different classes?
>
> Thanks,
>   Caleb
>