You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2019/06/17 17:06:03 UTC

Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Hi,
Need to calculate a 4 hour time window for count, sum with current
calculated results being output every 5 mins.
How do i do that ?
Currently, I calculate results for 5 sec and 5 min time windows fine on the
KeyedStream.

Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
> Time.seconds(timeIntervalL);
> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
> kinesisStream.keyBy(...);
> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream =
>         monitoringTupleKeyedStream
>                 .timeWindow(timeWindow);
> DataStream<....> enrichedMGStream = windowStream.aggregate(
>         new MGroupingWindowAggregateClass(...),
>         new MGroupingAggregateClass(....))
>         .map(new Monitoring...(...));
> enrichedMGStream.addSink(..);
>


TIA,
Vijay

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Rafi,
I tried your approach with:

> windowStream.trigger(ContinuousEventTimeTrigger.of(Time.minutes(5)));
>
> I can use .trigger with ProcessWindowFunction but it doesn't accumulate
data across windows i.e I want to collect data for a 5h window with data
sent to output every 5 mins with the output data getting accumulated after
every 5 mins.

@Felipe- I am using a ProcessWindowFunction and cannot find a way to use
process() & onTimer with it.

On Sun, Jun 30, 2019 at 11:45 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> No, there is no specific reason.
> I am using it because I am computing the HyperLogLog over a window.
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Jul 1, 2019 at 12:34 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi Felipe,
>> Thanks for the example. I will try a variation of that for mine. Is there
>> a specific reason to use the HyperLogLogState ?
>>
>> Vijay
>>
>> On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> I managed by using
>>> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
>>> processElement method and clearing the state on the onTimer method. This is
>>> my program [1].
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>>>
>>> Kind Regards,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <ra...@gmail.com> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> When using windows, you may use the 'trigger' to set a Custom Trigger
>>>> which would trigger your *ProcessWindowFunction* accordingly.
>>>>
>>>> In your case, you would probably use:
>>>>
>>>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>>>
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>>
>>>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bv...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am also implementing the ProcessWindowFunction and accessing the
>>>>> windowState to get data but how do i push data out every 5 mins during a 4
>>>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>>>
>>>>> public  class MGroupingAggregateClass extends
>>>>>> ProcessWindowFunction<....> {
>>>>>>
>>>>>> private MapState<String, Object> timedGroupKeyState;
>>>>>> private MapState<String, Object> globalGroupKeyState;
>>>>>> private final MapStateDescriptor<String, Object>
>>>>>> timedMapKeyStateDescriptor =
>>>>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>>>>                String.class, Object.class);
>>>>>> private final MapStateDescriptor<String, Object>
>>>>>> globalMapKeyStateDescriptor =
>>>>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>>>>                    String.class, Object.class);
>>>>>>
>>>>>>
>>>>>> public void open(Configuration ..) {
>>>>>> timedGroupKeyState =
>>>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>>>> globalGroupKeyState =
>>>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>>>> }
>>>>>>
>>>>>> public void process(MonitoringTuple currKey, Context context,
>>>>>> Iterable<Map<String, Object>> elements,
>>>>>>                        Collector<Map<String, Object>> out) throws
>>>>>> Exception {
>>>>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>>>        timedGroupKeyState =
>>>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>>>        globalGroupKeyState =
>>>>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>>>>> ...
>>>>>> //get data fromm state
>>>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>>>>
>>>>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>>>>> window ??
>>>>>>
>>>>>> }
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <
>>>>> bvijaykr@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Need to calculate a 4 hour time window for count, sum with current
>>>>>> calculated results being output every 5 mins.
>>>>>> How do i do that ?
>>>>>> Currently, I calculate results for 5 sec and 5 min time windows fine
>>>>>> on the KeyedStream.
>>>>>>
>>>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg:
>>>>>>> timeWindow = Time.seconds(timeIntervalL);
>>>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>>>>> kinesisStream.keyBy(...);
>>>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>>>>> windowStream =
>>>>>>>         monitoringTupleKeyedStream
>>>>>>>                 .timeWindow(timeWindow);
>>>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>>>>         new MGroupingWindowAggregateClass(...),
>>>>>>>         new MGroupingAggregateClass(....))
>>>>>>>         .map(new Monitoring...(...));
>>>>>>> enrichedMGStream.addSink(..);
>>>>>>>
>>>>>>
>>>>>>
>>>>>> TIA,
>>>>>> Vijay
>>>>>>
>>>>>

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Felipe Gutierrez <fe...@gmail.com>.
No, there is no specific reason.
I am using it because I am computing the HyperLogLog over a window.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Jul 1, 2019 at 12:34 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi Felipe,
> Thanks for the example. I will try a variation of that for mine. Is there
> a specific reason to use the HyperLogLogState ?
>
> Vijay
>
> On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> I managed by using
>> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
>> processElement method and clearing the state on the onTimer method. This is
>> my program [1].
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>>
>> Kind Regards,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <ra...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> When using windows, you may use the 'trigger' to set a Custom Trigger
>>> which would trigger your *ProcessWindowFunction* accordingly.
>>>
>>> In your case, you would probably use:
>>>
>>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bv...@gmail.com>
>>> wrote:
>>>
>>>> I am also implementing the ProcessWindowFunction and accessing the
>>>> windowState to get data but how do i push data out every 5 mins during a 4
>>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>>
>>>> public  class MGroupingAggregateClass extends
>>>>> ProcessWindowFunction<....> {
>>>>>
>>>>> private MapState<String, Object> timedGroupKeyState;
>>>>> private MapState<String, Object> globalGroupKeyState;
>>>>> private final MapStateDescriptor<String, Object>
>>>>> timedMapKeyStateDescriptor =
>>>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>>>                String.class, Object.class);
>>>>> private final MapStateDescriptor<String, Object>
>>>>> globalMapKeyStateDescriptor =
>>>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>>>                    String.class, Object.class);
>>>>>
>>>>>
>>>>> public void open(Configuration ..) {
>>>>> timedGroupKeyState =
>>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>>> globalGroupKeyState =
>>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>>> }
>>>>>
>>>>> public void process(MonitoringTuple currKey, Context context,
>>>>> Iterable<Map<String, Object>> elements,
>>>>>                        Collector<Map<String, Object>> out) throws
>>>>> Exception {
>>>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>>        timedGroupKeyState =
>>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>>        globalGroupKeyState =
>>>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>>>> ...
>>>>> //get data fromm state
>>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>>>
>>>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>>>> window ??
>>>>>
>>>>> }
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bv...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Need to calculate a 4 hour time window for count, sum with current
>>>>> calculated results being output every 5 mins.
>>>>> How do i do that ?
>>>>> Currently, I calculate results for 5 sec and 5 min time windows fine
>>>>> on the KeyedStream.
>>>>>
>>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow
>>>>>> = Time.seconds(timeIntervalL);
>>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy(...);
>>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>>>> windowStream =
>>>>>>         monitoringTupleKeyedStream
>>>>>>                 .timeWindow(timeWindow);
>>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>>>         new MGroupingWindowAggregateClass(...),
>>>>>>         new MGroupingAggregateClass(....))
>>>>>>         .map(new Monitoring...(...));
>>>>>> enrichedMGStream.addSink(..);
>>>>>>
>>>>>
>>>>>
>>>>> TIA,
>>>>> Vijay
>>>>>
>>>>

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Felipe,
Thanks for the example. I will try a variation of that for mine. Is there a
specific reason to use the HyperLogLogState ?

Vijay

On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi Vijay,
>
> I managed by using
> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
> processElement method and clearing the state on the onTimer method. This is
> my program [1].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <ra...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> When using windows, you may use the 'trigger' to set a Custom Trigger
>> which would trigger your *ProcessWindowFunction* accordingly.
>>
>> In your case, you would probably use:
>>
>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>
>>
>> Thanks,
>> Rafi
>>
>>
>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> I am also implementing the ProcessWindowFunction and accessing the
>>> windowState to get data but how do i push data out every 5 mins during a 4
>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>
>>> public  class MGroupingAggregateClass extends
>>>> ProcessWindowFunction<....> {
>>>>
>>>> private MapState<String, Object> timedGroupKeyState;
>>>> private MapState<String, Object> globalGroupKeyState;
>>>> private final MapStateDescriptor<String, Object>
>>>> timedMapKeyStateDescriptor =
>>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>>                String.class, Object.class);
>>>> private final MapStateDescriptor<String, Object>
>>>> globalMapKeyStateDescriptor =
>>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>>                    String.class, Object.class);
>>>>
>>>>
>>>> public void open(Configuration ..) {
>>>> timedGroupKeyState =
>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>> globalGroupKeyState =
>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>> }
>>>>
>>>> public void process(MonitoringTuple currKey, Context context,
>>>> Iterable<Map<String, Object>> elements,
>>>>                        Collector<Map<String, Object>> out) throws
>>>> Exception {
>>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>        timedGroupKeyState =
>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>        globalGroupKeyState =
>>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>>> ...
>>>> //get data fromm state
>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>>
>>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>>> window ??
>>>>
>>>> }
>>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bv...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Need to calculate a 4 hour time window for count, sum with current
>>>> calculated results being output every 5 mins.
>>>> How do i do that ?
>>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>>> the KeyedStream.
>>>>
>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow
>>>>> = Time.seconds(timeIntervalL);
>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>>> kinesisStream.keyBy(...);
>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>>> windowStream =
>>>>>         monitoringTupleKeyedStream
>>>>>                 .timeWindow(timeWindow);
>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>>         new MGroupingWindowAggregateClass(...),
>>>>>         new MGroupingAggregateClass(....))
>>>>>         .map(new Monitoring...(...));
>>>>> enrichedMGStream.addSink(..);
>>>>>
>>>>
>>>>
>>>> TIA,
>>>> Vijay
>>>>
>>>

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Vijay,

I managed by using
"ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
processElement method and clearing the state on the onTimer method. This is
my program [1].

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <ra...@gmail.com> wrote:

> Hi Vijay,
>
> When using windows, you may use the 'trigger' to set a Custom Trigger
> which would trigger your *ProcessWindowFunction* accordingly.
>
> In your case, you would probably use:
>
>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>
>
> Thanks,
> Rafi
>
>
> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> I am also implementing the ProcessWindowFunction and accessing the
>> windowState to get data but how do i push data out every 5 mins during a 4
>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>> Or should I still use the context.windowState even for the 4 hr window ?
>>
>> public  class MGroupingAggregateClass extends ProcessWindowFunction<....>
>>> {
>>>
>>> private MapState<String, Object> timedGroupKeyState;
>>> private MapState<String, Object> globalGroupKeyState;
>>> private final MapStateDescriptor<String, Object>
>>> timedMapKeyStateDescriptor =
>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>                String.class, Object.class);
>>> private final MapStateDescriptor<String, Object>
>>> globalMapKeyStateDescriptor =
>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>                    String.class, Object.class);
>>>
>>>
>>> public void open(Configuration ..) {
>>> timedGroupKeyState =
>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>> globalGroupKeyState =
>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>> }
>>>
>>> public void process(MonitoringTuple currKey, Context context,
>>> Iterable<Map<String, Object>> elements,
>>>                        Collector<Map<String, Object>> out) throws
>>> Exception {
>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>> process interval:{}, currKey:{}", interval, currKey);
>>>        timedGroupKeyState =
>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>        globalGroupKeyState =
>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>> ...
>>> //get data fromm state
>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>
>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>> window ??
>>>
>>> }
>>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Need to calculate a 4 hour time window for count, sum with current
>>> calculated results being output every 5 mins.
>>> How do i do that ?
>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>> the KeyedStream.
>>>
>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>>>> Time.seconds(timeIntervalL);
>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>> kinesisStream.keyBy(...);
>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>> windowStream =
>>>>         monitoringTupleKeyedStream
>>>>                 .timeWindow(timeWindow);
>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>         new MGroupingWindowAggregateClass(...),
>>>>         new MGroupingAggregateClass(....))
>>>>         .map(new Monitoring...(...));
>>>> enrichedMGStream.addSink(..);
>>>>
>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Vijay,

When using windows, you may use the 'trigger' to set a Custom Trigger which
would trigger your *ProcessWindowFunction* accordingly.

In your case, you would probably use:

> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>

Thanks,
Rafi


On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> I am also implementing the ProcessWindowFunction and accessing the
> windowState to get data but how do i push data out every 5 mins during a 4
> hr time window ?? I am adding a globalState to handle the 4 hr window ???
> Or should I still use the context.windowState even for the 4 hr window ?
>
> public  class MGroupingAggregateClass extends ProcessWindowFunction<....> {
>>
>> private MapState<String, Object> timedGroupKeyState;
>> private MapState<String, Object> globalGroupKeyState;
>> private final MapStateDescriptor<String, Object>
>> timedMapKeyStateDescriptor =
>>        new MapStateDescriptor<>("timedGroupKeyState",
>>                String.class, Object.class);
>> private final MapStateDescriptor<String, Object>
>> globalMapKeyStateDescriptor =
>>            new MapStateDescriptor<>("globalGroupKeyState",
>>                    String.class, Object.class);
>>
>>
>> public void open(Configuration ..) {
>> timedGroupKeyState =
>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>> globalGroupKeyState =
>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>> }
>>
>> public void process(MonitoringTuple currKey, Context context,
>> Iterable<Map<String, Object>> elements,
>>                        Collector<Map<String, Object>> out) throws
>> Exception {
>>        logger.info("Entered MGroupingAggregateWindowProcessing - process
>> interval:{}, currKey:{}", interval, currKey);
>>        timedGroupKeyState =
>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>        globalGroupKeyState =
>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>> ...
>> //get data fromm state
>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>
>> //how do i push the data out every 5 mins to the sink during the 4 hr
>> window ??
>>
>> }
>>
>
>
>
>
>
>
>
> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi,
>> Need to calculate a 4 hour time window for count, sum with current
>> calculated results being output every 5 mins.
>> How do i do that ?
>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>> the KeyedStream.
>>
>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>>> Time.seconds(timeIntervalL);
>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>> kinesisStream.keyBy(...);
>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream
>>> =
>>>         monitoringTupleKeyedStream
>>>                 .timeWindow(timeWindow);
>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>         new MGroupingWindowAggregateClass(...),
>>>         new MGroupingAggregateClass(....))
>>>         .map(new Monitoring...(...));
>>> enrichedMGStream.addSink(..);
>>>
>>
>>
>> TIA,
>> Vijay
>>
>

Re: Calculate a 4 hour time window for sum,count with sum and count output every 5 mins

Posted by Vijay Balakrishnan <bv...@gmail.com>.
I am also implementing the ProcessWindowFunction and accessing the
windowState to get data but how do i push data out every 5 mins during a 4
hr time window ?? I am adding a globalState to handle the 4 hr window ???
Or should I still use the context.windowState even for the 4 hr window ?

public  class MGroupingAggregateClass extends ProcessWindowFunction<....> {
>
> private MapState<String, Object> timedGroupKeyState;
> private MapState<String, Object> globalGroupKeyState;
> private final MapStateDescriptor<String, Object>
> timedMapKeyStateDescriptor =
>        new MapStateDescriptor<>("timedGroupKeyState",
>                String.class, Object.class);
> private final MapStateDescriptor<String, Object>
> globalMapKeyStateDescriptor =
>            new MapStateDescriptor<>("globalGroupKeyState",
>                    String.class, Object.class);
>
>
> public void open(Configuration ..) {
> timedGroupKeyState =
> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
> globalGroupKeyState =
> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
> }
>
> public void process(MonitoringTuple currKey, Context context,
> Iterable<Map<String, Object>> elements,
>                        Collector<Map<String, Object>> out) throws
> Exception {
>        logger.info("Entered MGroupingAggregateWindowProcessing - process
> interval:{}, currKey:{}", interval, currKey);
>        timedGroupKeyState =
> context.windowState().getMapState(timedMapKeyStateDescriptor);
>        globalGroupKeyState =
> context.globalState().getMapState(globalMapKeyStateDescriptor);
> ...
> //get data fromm state
> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>
> //how do i push the data out every 5 mins to the sink during the 4 hr
> window ??
>
> }
>







On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
> Need to calculate a 4 hour time window for count, sum with current
> calculated results being output every 5 mins.
> How do i do that ?
> Currently, I calculate results for 5 sec and 5 min time windows fine on
> the KeyedStream.
>
> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>> Time.seconds(timeIntervalL);
>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>> kinesisStream.keyBy(...);
>> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream =
>>         monitoringTupleKeyedStream
>>                 .timeWindow(timeWindow);
>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>         new MGroupingWindowAggregateClass(...),
>>         new MGroupingAggregateClass(....))
>>         .map(new Monitoring...(...));
>> enrichedMGStream.addSink(..);
>>
>
>
> TIA,
> Vijay
>