You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rohan Thimmappa <ro...@gmail.com> on 2018/01/12 04:30:32 UTC

Aggregation using event timestamp than clock window

Hi All,


I have following requirement

1. i have avro json message containing {eventid, usage, starttime, endtime}
2. i am reading this from kafka source

3. if there is overlapping hour in a record split the record by rounding
off to hourly bounderies
4.My objective is a) read the message b) aggregate the usage between the
hour
5. send the aggregated data to another kafka topic.

i don't want aggregate based on clock window. if i see next hour in endtime
then i would like to close the window and aggregated usage to be send down
to kafka sink topic.


eg:
input data
4.55 - 5.00
5.00 -5.25
5.25- 5.55.
5.55-625

after split
4.55- 5.00 - expect record to be going out with this
5.00 -5.25
5.25- 5.55.
5.55-6.00 - expect record to be going out with this
5.00-625




1. i have set the eventime :
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String,
Report]] = stream
  .flatMap(new SplitFlatMap()  // checks if the overlapping hour if
yes then create split recordr with hourly boundarry
  .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
  .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))

  .reduce(new Counter()) //aggrigates the usage collected within window

3. here is the implementation for timestampeextractor

class ReportTimestampExtractor extends
AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with
Serializable {
  override def extractTimestamp(e: Tuple2[String, Report],
prevElementTimestamp: Long) = {
    e.f1.getEndTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
  }
}


I see the aggregation is generated only the clock window rather than
when the window sees next hour in the record.



Is there anything i am missing. by definition eventtime if i set it
should respect message time rather than clock window




-- 
Thanks
Rohan

Re: Aggregation using event timestamp than clock window

Posted by Rohan Thimmappa <ro...@gmail.com>.
Hi Gary,


 Thanks.I do have some of the events coming in after one pauses and i
am able to see watermarked being advanced event being triggered.


Rohan

On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao <ga...@data-artisans.com> wrote:

> Hi Rohan,
>
> In your example, are you saying that after 5:40 you will not receive any
> events
> at all which could advance the watermark?
>
> I am asking because if you are receiving events for other keys/ids from
> your
> KafkaSource after 5:40, the watermark will still be advanced and fire the
> tumbling window.
>
> Best,
> Gary
>
> On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <
> rohan.thimmappa@gmail.com> wrote:
>
>> No. My question is slightly different.
>>
>> say i get report from 5.10-5.40. the device went offline and never comes
>> back. i will not get any report after 5.40. So 5-6 window never gets closed
>> as we will not get any report after 5.40. in this case 5.00-5.40 data is
>> still in flink memory that will never get sent as we are closing the window
>> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>>
>>
>> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
>> message for the given id then i would like to close the window and send
>> this to destination system(in my case kafka topic.)
>>
>>
>>
>>
>> Rohan
>>
>> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <ga...@data-artisans.com> wrote:
>>
>>> Hi Rohan,
>>>
>>> I am not sure if I fully understand your problem. For example, if you
>>> receive an
>>> event with a start time of 4:50 and an end time of 5:30, do you want the
>>> "usage"
>>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>>> event had
>>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>>> 5:00
>>> window?
>>>
>>> Best,
>>>
>>> Gary
>>>
>>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>>> rohan.thimmappa@gmail.com> wrote:
>>>
>>>> Hi Gary,
>>>>
>>>> This is perfect. I am able to get the window working on message
>>>> timestamp then clock window also stream the data that are late.
>>>>
>>>> I also having one edge case.
>>>>
>>>>
>>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>>>> *ever*. i would like to wait for sometime. My reporting interval size
>>>> is 30 min. if in next 30 min if i don't see any record then i would like to
>>>> construct 4-5 by closing the window and dispatch the report. Intention is i
>>>> don't want to loss the last hour of the data since the stream end in
>>>> between the hour.
>>>>
>>>> Rohan
>>>>
>>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <ga...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi Rohan,
>>>>>
>>>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>>>> correctly
>>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>>>> In Flink
>>>>> Watermarks are the mechanism to advance the event time. Hence, you
>>>>> should emit
>>>>> Watermarks according to the time that you extract from your events.
>>>>>
>>>>> You can take a look at the already existing timestamp extractors /
>>>>> watermark
>>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>>>> how it can
>>>>> be done.
>>>>>
>>>>> Best,
>>>>> Gary
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> dev/event_timestamp_extractors.html
>>>>>
>>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>>>> rohan.thimmappa@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>
>>>>>> I have following requirement
>>>>>>
>>>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>>>> endtime}
>>>>>> 2. i am reading this from kafka source
>>>>>>
>>>>>> 3. if there is overlapping hour in a record split the record by
>>>>>> rounding off to hourly bounderies
>>>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>>>> the hour
>>>>>> 5. send the aggregated data to another kafka topic.
>>>>>>
>>>>>> i don't want aggregate based on clock window. if i see next hour in
>>>>>> endtime then i would like to close the window and aggregated usage to be
>>>>>> send down to kafka sink topic.
>>>>>>
>>>>>>
>>>>>> eg:
>>>>>> input data
>>>>>> 4.55 - 5.00
>>>>>> 5.00 -5.25
>>>>>> 5.25- 5.55.
>>>>>> 5.55-625
>>>>>>
>>>>>> after split
>>>>>> 4.55- 5.00 - expect record to be going out with this
>>>>>> 5.00 -5.25
>>>>>> 5.25- 5.55.
>>>>>> 5.55-6.00 - expect record to be going out with this
>>>>>> 5.00-625
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>
>>>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>>>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>>>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>>>   .keyBy(0)
>>>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>>>>
>>>>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>>>>
>>>>>> 3. here is the implementation for timestampeextractor
>>>>>>
>>>>>> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>>>>>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>>>>>>     e.f1.getEndTime
>>>>>>   }
>>>>>>
>>>>>>   override def getCurrentWatermark(): Watermark = {
>>>>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Rohan
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Rohan
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks
>> Rohan
>>
>
>


-- 
Thanks
Rohan

Re: Aggregation using event timestamp than clock window

Posted by Gary Yao <ga...@data-artisans.com>.
Hi Rohan,

In your example, are you saying that after 5:40 you will not receive any
events
at all which could advance the watermark?

I am asking because if you are receiving events for other keys/ids from your
KafkaSource after 5:40, the watermark will still be advanced and fire the
tumbling window.

Best,
Gary

On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <ro...@gmail.com>
wrote:

> No. My question is slightly different.
>
> say i get report from 5.10-5.40. the device went offline and never comes
> back. i will not get any report after 5.40. So 5-6 window never gets closed
> as we will not get any report after 5.40. in this case 5.00-5.40 data is
> still in flink memory that will never get sent as we are closing the window
> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>
>
> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
> message for the given id then i would like to close the window and send
> this to destination system(in my case kafka topic.)
>
>
>
>
> Rohan
>
> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <ga...@data-artisans.com> wrote:
>
>> Hi Rohan,
>>
>> I am not sure if I fully understand your problem. For example, if you
>> receive an
>> event with a start time of 4:50 and an end time of 5:30, do you want the
>> "usage"
>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>> event had
>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>> 5:00
>> window?
>>
>> Best,
>>
>> Gary
>>
>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>> rohan.thimmappa@gmail.com> wrote:
>>
>>> Hi Gary,
>>>
>>> This is perfect. I am able to get the window working on message
>>> timestamp then clock window also stream the data that are late.
>>>
>>> I also having one edge case.
>>>
>>>
>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>>> *ever*. i would like to wait for sometime. My reporting interval size
>>> is 30 min. if in next 30 min if i don't see any record then i would like to
>>> construct 4-5 by closing the window and dispatch the report. Intention is i
>>> don't want to loss the last hour of the data since the stream end in
>>> between the hour.
>>>
>>> Rohan
>>>
>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <ga...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Rohan,
>>>>
>>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>>> correctly
>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>>> In Flink
>>>> Watermarks are the mechanism to advance the event time. Hence, you
>>>> should emit
>>>> Watermarks according to the time that you extract from your events.
>>>>
>>>> You can take a look at the already existing timestamp extractors /
>>>> watermark
>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>>> how it can
>>>> be done.
>>>>
>>>> Best,
>>>> Gary
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/event_timestamp_extractors.html
>>>>
>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>>> rohan.thimmappa@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>>
>>>>> I have following requirement
>>>>>
>>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>>> endtime}
>>>>> 2. i am reading this from kafka source
>>>>>
>>>>> 3. if there is overlapping hour in a record split the record by
>>>>> rounding off to hourly bounderies
>>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>>> the hour
>>>>> 5. send the aggregated data to another kafka topic.
>>>>>
>>>>> i don't want aggregate based on clock window. if i see next hour in
>>>>> endtime then i would like to close the window and aggregated usage to be
>>>>> send down to kafka sink topic.
>>>>>
>>>>>
>>>>> eg:
>>>>> input data
>>>>> 4.55 - 5.00
>>>>> 5.00 -5.25
>>>>> 5.25- 5.55.
>>>>> 5.55-625
>>>>>
>>>>> after split
>>>>> 4.55- 5.00 - expect record to be going out with this
>>>>> 5.00 -5.25
>>>>> 5.25- 5.55.
>>>>> 5.55-6.00 - expect record to be going out with this
>>>>> 5.00-625
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>
>>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>>   .keyBy(0)
>>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>>>
>>>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>>>
>>>>> 3. here is the implementation for timestampeextractor
>>>>>
>>>>> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>>>>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>>>>>     e.f1.getEndTime
>>>>>   }
>>>>>
>>>>>   override def getCurrentWatermark(): Watermark = {
>>>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>>>>>
>>>>>
>>>>>
>>>>> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Rohan
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks
>>> Rohan
>>>
>>
>>
>
>
> --
> Thanks
> Rohan
>

Re: Aggregation using event timestamp than clock window

Posted by Rohan Thimmappa <ro...@gmail.com>.
No. My question is slightly different.

say i get report from 5.10-5.40. the device went offline and never comes
back. i will not get any report after 5.40. So 5-6 window never gets closed
as we will not get any report after 5.40. in this case 5.00-5.40 data is
still in flink memory that will never get sent as we are closing the window
by seeing the next hour window. ie any report carrying 6.00 end date in it.


so what i would like to do is. Wait for say 1 or 2 hours if i don't get
message for the given id then i would like to close the window and send
this to destination system(in my case kafka topic.)




Rohan

On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <ga...@data-artisans.com> wrote:

> Hi Rohan,
>
> I am not sure if I fully understand your problem. For example, if you
> receive an
> event with a start time of 4:50 and an end time of 5:30, do you want the
> "usage"
> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
> event had
> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
> 5:00
> window?
>
> Best,
>
> Gary
>
> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
> rohan.thimmappa@gmail.com> wrote:
>
>> Hi Gary,
>>
>> This is perfect. I am able to get the window working on message timestamp
>> then clock window also stream the data that are late.
>>
>> I also having one edge case.
>>
>>
>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>> *ever*. i would like to wait for sometime. My reporting interval size
>> is 30 min. if in next 30 min if i don't see any record then i would like to
>> construct 4-5 by closing the window and dispatch the report. Intention is i
>> don't want to loss the last hour of the data since the stream end in
>> between the hour.
>>
>> Rohan
>>
>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <ga...@data-artisans.com>
>> wrote:
>>
>>> Hi Rohan,
>>>
>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>> correctly
>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>> In Flink
>>> Watermarks are the mechanism to advance the event time. Hence, you
>>> should emit
>>> Watermarks according to the time that you extract from your events.
>>>
>>> You can take a look at the already existing timestamp extractors /
>>> watermark
>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>> how it can
>>> be done.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/event_timestamp_extractors.html
>>>
>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>> rohan.thimmappa@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>>
>>>> I have following requirement
>>>>
>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>> endtime}
>>>> 2. i am reading this from kafka source
>>>>
>>>> 3. if there is overlapping hour in a record split the record by
>>>> rounding off to hourly bounderies
>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>> the hour
>>>> 5. send the aggregated data to another kafka topic.
>>>>
>>>> i don't want aggregate based on clock window. if i see next hour in
>>>> endtime then i would like to close the window and aggregated usage to be
>>>> send down to kafka sink topic.
>>>>
>>>>
>>>> eg:
>>>> input data
>>>> 4.55 - 5.00
>>>> 5.00 -5.25
>>>> 5.25- 5.55.
>>>> 5.55-625
>>>>
>>>> after split
>>>> 4.55- 5.00 - expect record to be going out with this
>>>> 5.00 -5.25
>>>> 5.25- 5.55.
>>>> 5.55-6.00 - expect record to be going out with this
>>>> 5.00-625
>>>>
>>>>
>>>>
>>>>
>>>> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>
>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>   .keyBy(0)
>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>>
>>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>>
>>>> 3. here is the implementation for timestampeextractor
>>>>
>>>> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>>>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>>>>     e.f1.getEndTime
>>>>   }
>>>>
>>>>   override def getCurrentWatermark(): Watermark = {
>>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>>>   }
>>>> }
>>>>
>>>>
>>>> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>>>>
>>>>
>>>>
>>>> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Rohan
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks
>> Rohan
>>
>
>


-- 
Thanks
Rohan

Re: Aggregation using event timestamp than clock window

Posted by Gary Yao <ga...@data-artisans.com>.
Hi Rohan,

I am not sure if I fully understand your problem. For example, if you
receive an
event with a start time of 4:50 and an end time of 5:30, do you want the
"usage"
from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
event had
an end time of 5:31? Do you then want to ignore the event for the 4:00 -
5:00
window?

Best,

Gary

On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <ro...@gmail.com>
wrote:

> Hi Gary,
>
> This is perfect. I am able to get the window working on message timestamp
> then clock window also stream the data that are late.
>
> I also having one edge case.
>
>
> for eg i get my last report at 4.57 and i never get 5.00+ hour report
> *ever*. i would like to wait for sometime. My reporting interval size  is
> 30 min. if in next 30 min if i don't see any record then i would like to
> construct 4-5 by closing the window and dispatch the report. Intention is i
> don't want to loss the last hour of the data since the stream end in
> between the hour.
>
> Rohan
>
> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <ga...@data-artisans.com> wrote:
>
>> Hi Rohan,
>>
>> Your ReportTimestampExtractor assigns timestamps to the stream records
>> correctly
>> but uses the wall clock to emit Watermarks (System.currentTimeMillis). In
>> Flink
>> Watermarks are the mechanism to advance the event time. Hence, you should
>> emit
>> Watermarks according to the time that you extract from your events.
>>
>> You can take a look at the already existing timestamp extractors /
>> watermark
>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>> how it can
>> be done.
>>
>> Best,
>> Gary
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/event_timestamp_extractors.html
>>
>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>> rohan.thimmappa@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>
>>> I have following requirement
>>>
>>> 1. i have avro json message containing {eventid, usage, starttime,
>>> endtime}
>>> 2. i am reading this from kafka source
>>>
>>> 3. if there is overlapping hour in a record split the record by rounding
>>> off to hourly bounderies
>>> 4.My objective is a) read the message b) aggregate the usage between the
>>> hour
>>> 5. send the aggregated data to another kafka topic.
>>>
>>> i don't want aggregate based on clock window. if i see next hour in
>>> endtime then i would like to close the window and aggregated usage to be
>>> send down to kafka sink topic.
>>>
>>>
>>> eg:
>>> input data
>>> 4.55 - 5.00
>>> 5.00 -5.25
>>> 5.25- 5.55.
>>> 5.55-625
>>>
>>> after split
>>> 4.55- 5.00 - expect record to be going out with this
>>> 5.00 -5.25
>>> 5.25- 5.55.
>>> 5.55-6.00 - expect record to be going out with this
>>> 5.00-625
>>>
>>>
>>>
>>>
>>> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>   .keyBy(0)
>>>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>
>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>
>>> 3. here is the implementation for timestampeextractor
>>>
>>> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>>>     e.f1.getEndTime
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>>   }
>>> }
>>>
>>>
>>> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>>>
>>>
>>>
>>> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Rohan
>>>
>>
>>
>
>
> --
> Thanks
> Rohan
>

Re: Aggregation using event timestamp than clock window

Posted by Rohan Thimmappa <ro...@gmail.com>.
Hi Gary,

This is perfect. I am able to get the window working on message timestamp
then clock window also stream the data that are late.

I also having one edge case.


for eg i get my last report at 4.57 and i never get 5.00+ hour report *ever*.
i would like to wait for sometime. My reporting interval size  is 30 min.
if in next 30 min if i don't see any record then i would like to construct
4-5 by closing the window and dispatch the report. Intention is i don't
want to loss the last hour of the data since the stream end in between the
hour.

Rohan

On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <ga...@data-artisans.com> wrote:

> Hi Rohan,
>
> Your ReportTimestampExtractor assigns timestamps to the stream records
> correctly
> but uses the wall clock to emit Watermarks (System.currentTimeMillis). In
> Flink
> Watermarks are the mechanism to advance the event time. Hence, you should
> emit
> Watermarks according to the time that you extract from your events.
>
> You can take a look at the already existing timestamp extractors /
> watermark
> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see how
> it can
> be done.
>
> Best,
> Gary
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamp_extractors.html
>
> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
> rohan.thimmappa@gmail.com> wrote:
>
>> Hi All,
>>
>>
>> I have following requirement
>>
>> 1. i have avro json message containing {eventid, usage, starttime,
>> endtime}
>> 2. i am reading this from kafka source
>>
>> 3. if there is overlapping hour in a record split the record by rounding
>> off to hourly bounderies
>> 4.My objective is a) read the message b) aggregate the usage between the
>> hour
>> 5. send the aggregated data to another kafka topic.
>>
>> i don't want aggregate based on clock window. if i see next hour in
>> endtime then i would like to close the window and aggregated usage to be
>> send down to kafka sink topic.
>>
>>
>> eg:
>> input data
>> 4.55 - 5.00
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-625
>>
>> after split
>> 4.55- 5.00 - expect record to be going out with this
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-6.00 - expect record to be going out with this
>> 5.00-625
>>
>>
>>
>>
>> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>   .keyBy(0)
>>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>
>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>
>> 3. here is the implementation for timestampeextractor
>>
>> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>>     e.f1.getEndTime
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>>   }
>> }
>>
>>
>> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>>
>>
>>
>> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>>
>>
>>
>>
>> --
>> Thanks
>> Rohan
>>
>
>


-- 
Thanks
Rohan

Re: Aggregation using event timestamp than clock window

Posted by Gary Yao <ga...@data-artisans.com>.
Hi Rohan,

Your ReportTimestampExtractor assigns timestamps to the stream records
correctly
but uses the wall clock to emit Watermarks (System.currentTimeMillis). In
Flink
Watermarks are the mechanism to advance the event time. Hence, you should
emit
Watermarks according to the time that you extract from your events.

You can take a look at the already existing timestamp extractors / watermark
emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see how
it can
be done.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html

On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <ro...@gmail.com>
wrote:

> Hi All,
>
>
> I have following requirement
>
> 1. i have avro json message containing {eventid, usage, starttime, endtime}
> 2. i am reading this from kafka source
>
> 3. if there is overlapping hour in a record split the record by rounding
> off to hourly bounderies
> 4.My objective is a) read the message b) aggregate the usage between the
> hour
> 5. send the aggregated data to another kafka topic.
>
> i don't want aggregate based on clock window. if i see next hour in
> endtime then i would like to close the window and aggregated usage to be
> send down to kafka sink topic.
>
>
> eg:
> input data
> 4.55 - 5.00
> 5.00 -5.25
> 5.25- 5.55.
> 5.55-625
>
> after split
> 4.55- 5.00 - expect record to be going out with this
> 5.00 -5.25
> 5.25- 5.55.
> 5.55-6.00 - expect record to be going out with this
> 5.00-625
>
>
>
>
> 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream
>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes then create split recordr with hourly boundarry
>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>   .keyBy(0)
>       .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>
>   .reduce(new Counter()) //aggrigates the usage collected within window
>
> 3. here is the implementation for timestampeextractor
>
> class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable {
>   override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = {
>     e.f1.getEndTime
>   }
>
>   override def getCurrentWatermark(): Watermark = {
>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
>   }
> }
>
>
> I see the aggregation is generated only the clock window rather than when the window sees next hour in the record.
>
>
>
> Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window
>
>
>
>
> --
> Thanks
> Rohan
>