You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adrienne Kole <ad...@gmail.com> on 2016/10/12 14:10:12 UTC
Keyed join Flink Streaming
Hi,
I have 2 streams which are partitioned based on key field. I want to join
those streams based on key fields on windows. This is an example I saw in
the flink website:
val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val firstKeyed = firstInput.keyBy("userId")
val secondKeyed = secondInput.keyBy("id")
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed)
onWindow(Time.of(5, SECONDS))
However, with current flink version,(1.1.2) I cannot do it. Basically even
if streams are keyed or not, I still have to specify the "where" and
"equal" clauses.
My question is that, is how can I implement keyed window joins in flink
streaming? And is there a difference between:
val firstInput: KeyedStream[MyType] = ...
val secondInput: KeyedStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
and
val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
Thanks
Adrienne
Re: Keyed join Flink Streaming
Posted by Adrienne Kole <ad...@gmail.com>.
Hi Ufuk,
Thanks for reply.
The example is at [1]. I have few questions:
If there is no difference between KeyedStream- KeyedStream join by key and
DataStream-DataStream join, then DataStream becomes KeyedStream with
`where` and `equal` clauses. Please correct me If I am wrong.
Is the execution of windowed joins in Flink is reduced to only one machine
in cluster, as it has quite low throughput, when comparing to other
operations?
[1]
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
Thanks
Adrienne
On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <uc...@apache.org> wrote:
Hey Adrienne!
On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <ad...@gmail.com>
wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed)
> onWindow(Time.of(5, SECONDS))
This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)
You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).
> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.
– Ufuk
Re: Keyed join Flink Streaming
Posted by Ufuk Celebi <uc...@apache.org>.
Hey Adrienne!
On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <ad...@gmail.com> wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed)
> onWindow(Time.of(5, SECONDS))
This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)
You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).
> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
> firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.
– Ufuk