You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adrienne Kole <ad...@gmail.com> on 2016/10/12 14:10:12 UTC

Keyed join Flink Streaming

Hi,

I have 2 streams which are partitioned based on key field. I want to join
those streams based on  key fields on windows. This is an example I saw in
the flink website:

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...

val firstKeyed = firstInput.keyBy("userId")
val secondKeyed = secondInput.keyBy("id")

val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed)
   onWindow(Time.of(5, SECONDS))

However, with current flink version,(1.1.2) I cannot do it. Basically even
if streams are keyed or not, I still have to specify the "where" and
"equal" clauses.

My question is that, is how can I implement keyed window joins in flink
streaming? And is there a difference between:

val firstInput: KeyedStream[MyType] = ...
val secondInput: KeyedStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

and


val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)


Thanks
Adrienne

Re: Keyed join Flink Streaming

Posted by Adrienne Kole <ad...@gmail.com>.
Hi Ufuk,
Thanks for reply.

The example is at [1]. I have few questions:

If there is  no difference between KeyedStream- KeyedStream join by key and
DataStream-DataStream join, then DataStream becomes KeyedStream with
`where` and `equal` clauses. Please correct me If I am wrong.


Is the execution of windowed joins in Flink is reduced to only one machine
in cluster, as it has quite low throughput, when comparing to other
operations?



[1]
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams


Thanks
Adrienne

On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <uc...@apache.org> wrote:

Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <ad...@gmail.com>
wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk

Re: Keyed join Flink Streaming

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <ad...@gmail.com> wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk