You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Caleb Welton <ca...@autonomic.ai> on 2016/08/25 23:51:55 UTC
Joining Streams with Kafka Streams
Hello,
I'm trying to understand best practices related to joining streams using
the Kafka Streams API.
I can configure the topology such that two sources feed into a single
processor:
topologyBuilder
.addSource("A", stringDeserializer, itemDeserializer, "a-topic")
.addSource("B", stringDeserializer, itemDeserializer, "b-topic)
.addProcessor("hello-join", HelloJoin::new, "A", "B")...
And within my processor I can determine which topic a given message came
from:
public void process(String Key, String value) {
if (context.topic.equals("a-topic") {
...
} else {
...
}
This allows for a crude form of cross stream join with the following
issues/limitations:
i. A string compare on topic name to decide which stream a message came
from. Having actual access to the TopicPartition could lead to more
efficient validation. Priority low, as this is just a small performance
hit, but it is a per message performance hit so would be nice to eliminate.
ii. This requires "a-topic" and "b-topic" to have the same message
format, which for general join handling is a pretty big limitation. What
would be the recommended way to handle the case of different message
formats, e.g. needing different deserializers for different input topics?
E.g. how would I define my Processor if the topology was:
topologyBuilder
.addSource("A", stringDeserializer, itemADeserializer, "a-topic")
.addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
.addProcessor("hello-join", HelloJoin::new, "A", "B")...
where itemADeserializer and itemBDeserializer return different classes?
Thanks,
Caleb
Re: Joining Streams with Kafka Streams
Posted by Guozhang Wang <wa...@gmail.com>.
I would suggest you trying the high-level Streams DSL as Michael mentioned,
it implements this kind of joins in a slight different way (i.e. as two
join processors against each other's materialized view, followed by a merge
processor node), and hence the above two issues you mentioned are
automatically resolved.
Guozhang
On Fri, Aug 26, 2016 at 2:47 AM, Michael Noll <mi...@confluent.io> wrote:
> First a follow-up question, just in case (sorry if that was obvious to you
> already): Have you considered using Kafka Streams DSL, which has much more
> convenient join functionality built-in out of the box? The reason I am
> asking is that you didn't specifically mention that you did try using the
> DSL and/or you need to use the Processor API because of other reasons as
> well.
>
> -Michael
>
>
> On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote:
>
> > Hello,
> >
> > I'm trying to understand best practices related to joining streams using
> > the Kafka Streams API.
> >
> > I can configure the topology such that two sources feed into a single
> > processor:
> >
> > topologyBuilder
> > .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
> > .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
> > .addProcessor("hello-join", HelloJoin::new, "A", "B")...
> >
> > And within my processor I can determine which topic a given message came
> > from:
> >
> > public void process(String Key, String value) {
> > if (context.topic.equals("a-topic") {
> > ...
> > } else {
> > ...
> > }
> >
> > This allows for a crude form of cross stream join with the following
> > issues/limitations:
> >
> > i. A string compare on topic name to decide which stream a message
> came
> > from. Having actual access to the TopicPartition could lead to more
> > efficient validation. Priority low, as this is just a small performance
> > hit, but it is a per message performance hit so would be nice to
> eliminate.
> >
> > ii. This requires "a-topic" and "b-topic" to have the same message
> > format, which for general join handling is a pretty big limitation. What
> > would be the recommended way to handle the case of different message
> > formats, e.g. needing different deserializers for different input topics?
> >
> > E.g. how would I define my Processor if the topology was:
> >
> > topologyBuilder
> > .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
> > .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
> > .addProcessor("hello-join", HelloJoin::new, "A", "B")...
> >
> > where itemADeserializer and itemBDeserializer return different classes?
> >
> > Thanks,
> > Caleb
> >
>
--
-- Guozhang
Re: Joining Streams with Kafka Streams
Posted by Michael Noll <mi...@confluent.io>.
First a follow-up question, just in case (sorry if that was obvious to you
already): Have you considered using Kafka Streams DSL, which has much more
convenient join functionality built-in out of the box? The reason I am
asking is that you didn't specifically mention that you did try using the
DSL and/or you need to use the Processor API because of other reasons as
well.
-Michael
On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote:
> Hello,
>
> I'm trying to understand best practices related to joining streams using
> the Kafka Streams API.
>
> I can configure the topology such that two sources feed into a single
> processor:
>
> topologyBuilder
> .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
> .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
> .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> And within my processor I can determine which topic a given message came
> from:
>
> public void process(String Key, String value) {
> if (context.topic.equals("a-topic") {
> ...
> } else {
> ...
> }
>
> This allows for a crude form of cross stream join with the following
> issues/limitations:
>
> i. A string compare on topic name to decide which stream a message came
> from. Having actual access to the TopicPartition could lead to more
> efficient validation. Priority low, as this is just a small performance
> hit, but it is a per message performance hit so would be nice to eliminate.
>
> ii. This requires "a-topic" and "b-topic" to have the same message
> format, which for general join handling is a pretty big limitation. What
> would be the recommended way to handle the case of different message
> formats, e.g. needing different deserializers for different input topics?
>
> E.g. how would I define my Processor if the topology was:
>
> topologyBuilder
> .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
> .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
> .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> where itemADeserializer and itemBDeserializer return different classes?
>
> Thanks,
> Caleb
>