You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2017/02/03 14:48:00 UTC

Re: allowed lateness on windowed join?

Hi,
I'm afraid that's not possible but you can use a regular stream and do the
join yourself. What the code for JoinedStreams essentially does is take two
streams, map them to a common data type, union them and then perform a
normal window operation.

The code for this is in CoGroupedStreams (as the general case of a join)
and JoinedStreams.

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:38 Saiph Kappa <sa...@gmail.com> wrote:

> Hi all,
>
> Is it possible to specify  allowed lateness for a window join like the
> following one:
>
> val tweetsAndWarning = warningsPerStock.join(tweetsPerStock).where(_.symbol).equalTo(_.symbol)
>     .window(SlidingEventTimeWindows.of(Time.of(windowDurationSec, TimeUnit.SECONDS), Time.of(windowDurationSec,
>       TimeUnit.SECONDS)))
>   .apply((c1, c2) => (c1.count, c2.count))
>
>
> I think it is related with these:
>
> https://cwiki.apache.org/confluence/display/FLINK/Streaming+Window+Join+Rework
> https://issues.apache.org/jira/browse/FLINK-3109
>
>
> Thanks!
>

Re: allowed lateness on windowed join?

Posted by Aljoscha Krettek <al...@apache.org>.
And to add to that: yes, this is what I was suggesting. :-)

On Mon, 6 Feb 2017 at 09:58 Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Union is a super cheap operator in Flink. It does not scan the records,
> but just merges the streams. So the effort is very low.
> The built-in join operator works in the same way but does not expose
> allowed lateness.
>
> Cheers, Fabian
>

Re: allowed lateness on windowed join?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Union is a super cheap operator in Flink. It does not scan the records, but
just merges the streams. So the effort is very low.
The built-in join operator works in the same way but does not expose
allowed lateness.

Cheers, Fabian

Re: allowed lateness on windowed join?

Posted by Saiph Kappa <sa...@gmail.com>.
So, you are saying that I can do the join with a regular stream by using
the union transformation? For that, I would need to know which data belongs
to which stream. I can add some tags to the streamed data so that I would
know by which order I should join the elements. This was what you were
proposing right? The only drawback, I think, is that tuples in both
upstreams would have to be scanned 2 times: 1 time for performing the
union, and then again to perform the join in a custom function.

Thanks!

On Fri, Feb 3, 2017 at 2:48 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I'm afraid that's not possible but you can use a regular stream and do the
> join yourself. What the code for JoinedStreams essentially does is take two
> streams, map them to a common data type, union them and then perform a
> normal window operation.
>
> The code for this is in CoGroupedStreams (as the general case of a join)
> and JoinedStreams.
>
> Cheers,
> Aljoscha
>
> On Mon, 30 Jan 2017 at 17:38 Saiph Kappa <sa...@gmail.com> wrote:
>
>> Hi all,
>>
>> Is it possible to specify  allowed lateness for a window join like the
>> following one:
>>
>> val tweetsAndWarning = warningsPerStock.join(tweetsPerStock).where(_.symbol).equalTo(_.symbol)
>>     .window(SlidingEventTimeWindows.of(Time.of(windowDurationSec, TimeUnit.SECONDS), Time.of(windowDurationSec,
>>       TimeUnit.SECONDS)))
>>   .apply((c1, c2) => (c1.count, c2.count))
>>
>>
>> I think it is related with these:
>> https://cwiki.apache.org/confluence/display/FLINK/
>> Streaming+Window+Join+Rework
>> https://issues.apache.org/jira/browse/FLINK-3109
>>
>>
>> Thanks!
>>
>