You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Dong-iL, Kim" <ki...@gmail.com> on 2016/09/08 03:59:52 UTC
assignTimestamp after keyBy
Hi.
my stream data is from some files. ( files -> kafka -> flink(source -> keyBy -> windowing) )
data is arranged in a file.
I wanna assingTimestamp after keyBy.
How can I do that.
Regards.
Re: assignTimestamp after keyBy
Posted by "Dong-iL, Kim" <ki...@gmail.com>.
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy.
I’ve already tested as like your code.
It occured many warnings that timestamp monotony violated.
> On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim <ki...@gmail.com> wrote:
>
> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
>
>> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <pu...@gmail.com> wrote:
>>
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>> for assigning timestamps.
>>
>> You can do map after keyby to assign timestamps
>>
>> e.g:
>>
>> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>> .filter( _.severity == WARNING )
>> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>>
>> withTimestampsAndWatermarks
>> .keyBy( _.getGroup )
>> .timeWindow(Time.seconds(10))
>> .reduce( (a, b) => a.add(b) )
>> .addSink(...
>>
>> ~Pushpendra
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>
Re: assignTimestamp after keyBy
Posted by Fabian Hueske <fh...@gmail.com>.
I would assign timestamps directly at the source.
Timestamps are not striped of by operators.
Reassigning timestamps somewhere in the middle of a job can cause very
unexpected results.
2016-09-08 9:32 GMT+02:00 Dong-iL, Kim <ki...@gmail.com>:
> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
>
> > On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <
> pushpendra.jaiswal90@gmail.com> wrote:
> >
> > Please refer
> > https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_timestamps_watermarks.html
> > for assigning timestamps.
> >
> > You can do map after keyby to assign timestamps
> >
> > e.g:
> >
> > val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
> > .filter( _.severity == WARNING )
> > .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> >
> > withTimestampsAndWatermarks
> > .keyBy( _.getGroup )
> > .timeWindow(Time.seconds(10))
> > .reduce( (a, b) => a.add(b) )
> > .addSink(...
> >
> > ~Pushpendra
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-
> after-keyBy-tp8962p8964.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
Re: assignTimestamp after keyBy
Posted by "Dong-iL, Kim" <ki...@gmail.com>.
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards
> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <pu...@gmail.com> wrote:
>
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> for assigning timestamps.
>
> You can do map after keyby to assign timestamps
>
> e.g:
>
> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
> .filter( _.severity == WARNING )
> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>
> withTimestampsAndWatermarks
> .keyBy( _.getGroup )
> .timeWindow(Time.seconds(10))
> .reduce( (a, b) => a.add(b) )
> .addSink(...
>
> ~Pushpendra
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: assignTimestamp after keyBy
Posted by "pushpendra.jaiswal" <pu...@gmail.com>.
Please refer
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
for assigning timestamps.
You can do map after keyby to assign timestamps
e.g:
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...
~Pushpendra
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.