You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Neumann <mn...@sics.se> on 2015/12/10 12:04:15 UTC

Streaming time window

Hej,

Is it possible to extract the start and end window time stamps from within
a window operator?

I have an event time based window that does a simple fold function. I want
to put the output into elasticsearch and want to preserve the start and end
timestamp of the data so I can directly compare it with related data. The
only Idea I had so far was to manually keep track of the minimum and
maximum timestamp found in a window and pass them along with the output.
This is a quite bad approximation since the window I see depends alot on
how the values are spaced out. Anyone an idea how to do this?

cheers Martin

Re: Streaming time window

Posted by Fabian Hueske <fh...@gmail.com>.
You are right, WindowFunctions collect all data in a window and are
evaluated at once.
Although FoldFunctions could be directly applied on each element that
enters a window, this is not done at the moment.
Only ReduceFunctions are eagerly applied.

If you port your code to a ReduceFunction, you can do

.apply(ReduceFunction, WindowFunction)

This will first call the ReduceFunction and finally call the WindowFunction
with the result of the ReduceFunction.
In principle, this is also possible for fold, but not yet implemented.

Best, Fabian

2015-12-10 15:16 GMT+01:00 Martin Neumann <mn...@sics.se>:

> I will give this a try.
>
> Though I'm not sure I can switch over to WindowFunction.
> I work with potentially huge Windows, the Fold gives me a minimal and
> constant memory footprint. Switching to WindowFunction will require to keep
> the Window in Memory before it can be processed (at least to my
> understanding) this will lead to problems. Any Idea how to get around this?
>
> cheers Martin
>
>
>
> On Thu, Dec 10, 2015 at 2:59 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Sure. You don't need a trigger, but a WindowFunction instead of the
>> FoldFunction.
>> Only the WindowFunction has access to the Window object.
>>
>> Something like this:
>>
>> poissHostStreams
>>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>>         .apply(new WindowFunction<IN, OUT, KEY, TimeWindow>() {
>>
>>           @override
>>           public void apply(KEY key, TimeWindow window, Iterable<IN>
>> vals, Collector<OUT> out) {
>>             // YOUR CODE
>>             window.getEnd()
>>           }
>>         })
>>
>> Best, Fabian
>>
>> 2015-12-10 14:41 GMT+01:00 Martin Neumann <mn...@sics.se>:
>>
>>> Hi Fabian,
>>>
>>> thanks for your answer. Can I do the same in java using normal time
>>> windows (without additional trigger)?
>>>
>>> My current codes looks like this:
>>>
>>> poissHostStreams
>>>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>>>         .fold(new Tuple2<>("", new HashMap<>()), new MultiValuePoissonPreProcess())
>>>
>>> How can I get access to the time window object in the fold function?
>>>
>>>
>>> cheers Martin
>>>
>>>
>>> On Thu, Dec 10, 2015 at 12:20 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> you can get the start and end time of a window from the TimeWindow
>>>> object.
>>>> The following Scala code snippet shows how to access the window end
>>>> time (start time is equivalent):
>>>>
>>>> .timeWindow(Time.minutes(5))
>>>> .trigger(new EarlyCountTrigger(earlyCountThreshold))
>>>> .apply { (
>>>>   key: Int,
>>>>   window: TimeWindow,
>>>>   vals: Iterable[(Int, Short)],
>>>>   out: Collector[(Int, Long, Int)]) =>
>>>>     out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
>>>> }
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2015-12-10 12:04 GMT+01:00 Martin Neumann <mn...@sics.se>:
>>>>
>>>>> Hej,
>>>>>
>>>>> Is it possible to extract the start and end window time stamps from
>>>>> within a window operator?
>>>>>
>>>>> I have an event time based window that does a simple fold function. I
>>>>> want to put the output into elasticsearch and want to preserve the start
>>>>> and end timestamp of the data so I can directly compare it with related
>>>>> data. The only Idea I had so far was to manually keep track of the minimum
>>>>> and maximum timestamp found in a window and pass them along with the
>>>>> output. This is a quite bad approximation since the window I see depends
>>>>> alot on how the values are spaced out. Anyone an idea how to do this?
>>>>>
>>>>> cheers Martin
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Streaming time window

Posted by Martin Neumann <mn...@sics.se>.
I will give this a try.

Though I'm not sure I can switch over to WindowFunction.
I work with potentially huge Windows, the Fold gives me a minimal and
constant memory footprint. Switching to WindowFunction will require to keep
the Window in Memory before it can be processed (at least to my
understanding) this will lead to problems. Any Idea how to get around this?

cheers Martin



On Thu, Dec 10, 2015 at 2:59 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Sure. You don't need a trigger, but a WindowFunction instead of the
> FoldFunction.
> Only the WindowFunction has access to the Window object.
>
> Something like this:
>
> poissHostStreams
>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>         .apply(new WindowFunction<IN, OUT, KEY, TimeWindow>() {
>
>           @override
>           public void apply(KEY key, TimeWindow window, Iterable<IN> vals,
> Collector<OUT> out) {
>             // YOUR CODE
>             window.getEnd()
>           }
>         })
>
> Best, Fabian
>
> 2015-12-10 14:41 GMT+01:00 Martin Neumann <mn...@sics.se>:
>
>> Hi Fabian,
>>
>> thanks for your answer. Can I do the same in java using normal time
>> windows (without additional trigger)?
>>
>> My current codes looks like this:
>>
>> poissHostStreams
>>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>>         .fold(new Tuple2<>("", new HashMap<>()), new MultiValuePoissonPreProcess())
>>
>> How can I get access to the time window object in the fold function?
>>
>>
>> cheers Martin
>>
>>
>> On Thu, Dec 10, 2015 at 12:20 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> you can get the start and end time of a window from the TimeWindow
>>> object.
>>> The following Scala code snippet shows how to access the window end time
>>> (start time is equivalent):
>>>
>>> .timeWindow(Time.minutes(5))
>>> .trigger(new EarlyCountTrigger(earlyCountThreshold))
>>> .apply { (
>>>   key: Int,
>>>   window: TimeWindow,
>>>   vals: Iterable[(Int, Short)],
>>>   out: Collector[(Int, Long, Int)]) =>
>>>     out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
>>> }
>>>
>>> Cheers, Fabian
>>>
>>> 2015-12-10 12:04 GMT+01:00 Martin Neumann <mn...@sics.se>:
>>>
>>>> Hej,
>>>>
>>>> Is it possible to extract the start and end window time stamps from
>>>> within a window operator?
>>>>
>>>> I have an event time based window that does a simple fold function. I
>>>> want to put the output into elasticsearch and want to preserve the start
>>>> and end timestamp of the data so I can directly compare it with related
>>>> data. The only Idea I had so far was to manually keep track of the minimum
>>>> and maximum timestamp found in a window and pass them along with the
>>>> output. This is a quite bad approximation since the window I see depends
>>>> alot on how the values are spaced out. Anyone an idea how to do this?
>>>>
>>>> cheers Martin
>>>>
>>>
>>>
>>
>

Re: Streaming time window

Posted by Fabian Hueske <fh...@gmail.com>.
Sure. You don't need a trigger, but a WindowFunction instead of the
FoldFunction.
Only the WindowFunction has access to the Window object.

Something like this:

poissHostStreams
        .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
        .apply(new WindowFunction<IN, OUT, KEY, TimeWindow>() {

          @override
          public void apply(KEY key, TimeWindow window, Iterable<IN> vals,
Collector<OUT> out) {
            // YOUR CODE
            window.getEnd()
          }
        })

Best, Fabian

2015-12-10 14:41 GMT+01:00 Martin Neumann <mn...@sics.se>:

> Hi Fabian,
>
> thanks for your answer. Can I do the same in java using normal time
> windows (without additional trigger)?
>
> My current codes looks like this:
>
> poissHostStreams
>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>         .fold(new Tuple2<>("", new HashMap<>()), new MultiValuePoissonPreProcess())
>
> How can I get access to the time window object in the fold function?
>
>
> cheers Martin
>
>
> On Thu, Dec 10, 2015 at 12:20 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> you can get the start and end time of a window from the TimeWindow object.
>> The following Scala code snippet shows how to access the window end time
>> (start time is equivalent):
>>
>> .timeWindow(Time.minutes(5))
>> .trigger(new EarlyCountTrigger(earlyCountThreshold))
>> .apply { (
>>   key: Int,
>>   window: TimeWindow,
>>   vals: Iterable[(Int, Short)],
>>   out: Collector[(Int, Long, Int)]) =>
>>     out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
>> }
>>
>> Cheers, Fabian
>>
>> 2015-12-10 12:04 GMT+01:00 Martin Neumann <mn...@sics.se>:
>>
>>> Hej,
>>>
>>> Is it possible to extract the start and end window time stamps from
>>> within a window operator?
>>>
>>> I have an event time based window that does a simple fold function. I
>>> want to put the output into elasticsearch and want to preserve the start
>>> and end timestamp of the data so I can directly compare it with related
>>> data. The only Idea I had so far was to manually keep track of the minimum
>>> and maximum timestamp found in a window and pass them along with the
>>> output. This is a quite bad approximation since the window I see depends
>>> alot on how the values are spaced out. Anyone an idea how to do this?
>>>
>>> cheers Martin
>>>
>>
>>
>

Re: Streaming time window

Posted by Martin Neumann <mn...@sics.se>.
Hi Fabian,

thanks for your answer. Can I do the same in java using normal time windows
(without additional trigger)?

My current codes looks like this:

poissHostStreams
        .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
        .fold(new Tuple2<>("", new HashMap<>()), new
MultiValuePoissonPreProcess())

How can I get access to the time window object in the fold function?


cheers Martin


On Thu, Dec 10, 2015 at 12:20 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Martin,
>
> you can get the start and end time of a window from the TimeWindow object.
> The following Scala code snippet shows how to access the window end time
> (start time is equivalent):
>
> .timeWindow(Time.minutes(5))
> .trigger(new EarlyCountTrigger(earlyCountThreshold))
> .apply { (
>   key: Int,
>   window: TimeWindow,
>   vals: Iterable[(Int, Short)],
>   out: Collector[(Int, Long, Int)]) =>
>     out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
> }
>
> Cheers, Fabian
>
> 2015-12-10 12:04 GMT+01:00 Martin Neumann <mn...@sics.se>:
>
>> Hej,
>>
>> Is it possible to extract the start and end window time stamps from
>> within a window operator?
>>
>> I have an event time based window that does a simple fold function. I
>> want to put the output into elasticsearch and want to preserve the start
>> and end timestamp of the data so I can directly compare it with related
>> data. The only Idea I had so far was to manually keep track of the minimum
>> and maximum timestamp found in a window and pass them along with the
>> output. This is a quite bad approximation since the window I see depends
>> alot on how the values are spaced out. Anyone an idea how to do this?
>>
>> cheers Martin
>>
>
>

Re: Streaming time window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Martin,

you can get the start and end time of a window from the TimeWindow object.
The following Scala code snippet shows how to access the window end time
(start time is equivalent):

.timeWindow(Time.minutes(5))
.trigger(new EarlyCountTrigger(earlyCountThreshold))
.apply { (
  key: Int,
  window: TimeWindow,
  vals: Iterable[(Int, Short)],
  out: Collector[(Int, Long, Int)]) =>
    out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
}

Cheers, Fabian

2015-12-10 12:04 GMT+01:00 Martin Neumann <mn...@sics.se>:

> Hej,
>
> Is it possible to extract the start and end window time stamps from within
> a window operator?
>
> I have an event time based window that does a simple fold function. I want
> to put the output into elasticsearch and want to preserve the start and end
> timestamp of the data so I can directly compare it with related data. The
> only Idea I had so far was to manually keep track of the minimum and
> maximum timestamp found in a window and pass them along with the output.
> This is a quite bad approximation since the window I see depends alot on
> how the values are spaced out. Anyone an idea how to do this?
>
> cheers Martin
>