You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2016/05/02 14:48:31 UTC

Re: join performance

Hi Henry,
yes, with early firings you would have the problem of duplicate emission.
I'm afraid I don't have a solution for that right now.

For the "another question" I think you are right that this would be session
windowing. Please have a look at this blog post that I wrote recently:
http://data-artisans.com/session-windowing-in-flink/. And please get back
to us if you have more questions or feedback.

Cheers,
Aljoscha

On Fri, 29 Apr 2016 at 19:18 Henry Cai <hc...@pinterest.com> wrote:

> So is the window defined as hour-window or second-window?
>
> If I am using hour-window, I guess I need to modify the trigger to fire
> early (e.g. every minute)?  But I don't want to repeatedly emit the same
> joined records for every minute (i.e. on 2nd minute, I only want to emit
> the changes introduced by new coming records between 1st and 2nd minute)
>
> If I am using second-window, I wasn't sure why the record will still be
> put into the correct window based on hour gap?
>
> Another question is on which type of window, I need to match record a from
> stream a to record b in stream b if abs(a.time - b.time) < 1-hour, so it's
> not really a tumbling window on absolute wall clock, is this a session
> window?
>
> On Fri, Apr 29, 2016 at 4:36 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> you are right, everything will be emitted in a huge burst at the end of
>> the hour. If you want to experiment a bit you can write a custom Trigger
>> based on EventTimeTrigger that will delay firing of windows. You would
>> change onEventTime() to not fire but instead register a processing-time
>> timer at a random point in the future. Then, in onProcessingTime() you
>> would trigger the actual window processing. Elements will still be put into
>> the correct windows based on event time, just the firing of the windows
>> will change by doing this.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 29 Apr 2016 at 08:53 Henry Cai <hc...@pinterest.com> wrote:
>>
>>> But the join requirement is to match the records from two streams
>>> occurring within one hour (besides the normal join key condition), if I use
>>> the second join window, those records wouldn't be in the same window any
>>> more.
>>>
>>>
>>>
>>> On Thu, Apr 28, 2016 at 11:47 PM, Ashutosh Kumar <
>>> kmr.ashutosh16@gmail.com> wrote:
>>>
>>>> Time unit can be in seconds as well. Is there specific need to get
>>>> bursts hourly?
>>>>
>>>> On Fri, Apr 29, 2016 at 11:48 AM, Henry Cai <hc...@pinterest.com> wrote:
>>>>
>>>>> For the below standard stream/stream join, does flink store the
>>>>> results of stream 1 and stream 2 into state store for the current hour and
>>>>> at the end of the hour window it will fire the window by iterating through
>>>>> all stored elements in the state store to find join matches?
>>>>>
>>>>> My concern is during most of the time in the hour, the output
>>>>> (assuming the output is going to another stream) will be idle and on each
>>>>> hour mark there will be huge outputs of joined records emitted, any way to
>>>>> make it more gradual?
>>>>>
>>>>>
>>>>> dataStream.join(otherStream)
>>>>>     .where(0).equalTo(1)
>>>>>     .window(TumblingEventTimeWindows.of(Time.hours(1)))
>>>>>     .apply (new JoinFunction () {...});
>>>>>
>>>>>
>>>>
>>>
>