You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "igor.berman" <ig...@gmail.com> on 2017/01/08 20:06:16 UTC

Joining two kafka streams

Hi,
I have usecase when I need to join two kafka topics together by some fields. 
In general, I could put content of one topic into another, and partition by
same key, but I can't touch those two topics(i.e. there are other consumers
from those topics), on the other hand it's essential to process same keys at
same "thread" to achieve locality and not to get races when working with
same key from different machines/threads

my idea is to use union of two streams and then key by the field,
but is there better approach to achieve "locality"?

any inputs will be appreciated
Igor



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-kafka-streams-tp10912.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Joining two kafka streams

Posted by Igor Berman <ig...@gmail.com>.
Hi Tzu-Li,
Huge thanks for the input, I'll try to implement prototype of your idea and
see if it answers my requirements


On 9 January 2017 at 08:02, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:

> Hi Igor!
>
> What you can actually do is let a single FlinkKafkaConsumer consume from
> both topics, producing a single DataStream which you can keyBy afterwards.
> All versions of the FlinkKafkaConsumer support consuming multiple Kafka
> topics simultaneously. This is logically the same as union and then a
> keyBy, like what you described.
>
> Note that this approach requires that the records in both of your Kafka
> topics are of the same type when consumed into Flink (ex., same POJO
> classes, or simply both as Strings, etc.).
> If that isn’t possible and you have different data types / schemas for the
> topics, you’d probably need to use “connect” and then a keyBy.
>
> If you’re applying a window directly after joining the two topic streams,
> you could also use a window join:
>
> dataStream.join(otherStream)
>     .where(<key selector>).equalTo(<key selector>)
>     .window(TumblingEventTimeWindows.of(Time.seconds(3)))
>     .apply (new JoinFunction () {...});
>
> The “where” specifies how to select the key from the first stream, and
> “equalTo” the second one.
>
> Hope this helps, let me know if you have other questions!
>
> Cheers,
> Gordon
>
> On January 9, 2017 at 4:06:34 AM, igor.berman (igor.berman@gmail.com)
> wrote:
>
> Hi,
> I have usecase when I need to join two kafka topics together by some
> fields.
> In general, I could put content of one topic into another, and partition
> by
> same key, but I can't touch those two topics(i.e. there are other
> consumers
> from those topics), on the other hand it's essential to process same keys
> at
> same "thread" to achieve locality and not to get races when working with
> same key from different machines/threads
>
> my idea is to use union of two streams and then key by the field,
> but is there better approach to achieve "locality"?
>
> any inputs will be appreciated
> Igor
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Joining-two-
> kafka-streams-tp10912.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>

Re: Joining two kafka streams

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Igor!

What you can actually do is let a single FlinkKafkaConsumer consume from both topics, producing a single DataStream which you can keyBy afterwards.
All versions of the FlinkKafkaConsumer support consuming multiple Kafka topics simultaneously. This is logically the same as union and then a keyBy, like what you described.

Note that this approach requires that the records in both of your Kafka topics are of the same type when consumed into Flink (ex., same POJO classes, or simply both as Strings, etc.).
If that isn’t possible and you have different data types / schemas for the topics, you’d probably need to use “connect” and then a keyBy.

If you’re applying a window directly after joining the two topic streams, you could also use a window join:
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
The “where” specifies how to select the key from the first stream, and “equalTo” the second one.

Hope this helps, let me know if you have other questions!

Cheers,
Gordon

On January 9, 2017 at 4:06:34 AM, igor.berman (igor.berman@gmail.com) wrote:

Hi,  
I have usecase when I need to join two kafka topics together by some fields.  
In general, I could put content of one topic into another, and partition by  
same key, but I can't touch those two topics(i.e. there are other consumers  
from those topics), on the other hand it's essential to process same keys at  
same "thread" to achieve locality and not to get races when working with  
same key from different machines/threads  

my idea is to use union of two streams and then key by the field,  
but is there better approach to achieve "locality"?  

any inputs will be appreciated  
Igor  



--  
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-kafka-streams-tp10912.html  
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.