You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Dong-iL, Kim" <ki...@gmail.com> on 2016/09/08 03:59:52 UTC

assignTimestamp after keyBy

Hi.
my stream data is from some files. ( files -> kafka -> flink(source -> keyBy -> windowing) )
data is arranged in a file. 
I wanna assingTimestamp after keyBy.
How can I do that.
Regards.

Re: assignTimestamp after keyBy

Posted by "Dong-iL, Kim" <ki...@gmail.com>.
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy.
I’ve already tested as like your code.
It occured many warnings that timestamp monotony violated.

> On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim <ki...@gmail.com> wrote:
> 
> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
> 
>> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <pu...@gmail.com> wrote:
>> 
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>> for assigning timestamps.
>> 
>> You can do map after keyby to assign timestamps
>> 
>> e.g:
>> 
>> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>>       .filter( _.severity == WARNING )
>>       .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>> 
>> withTimestampsAndWatermarks
>>       .keyBy( _.getGroup )
>>       .timeWindow(Time.seconds(10))
>>       .reduce( (a, b) => a.add(b) )
>>       .addSink(...
>> 
>> ~Pushpendra
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: assignTimestamp after keyBy

Posted by Fabian Hueske <fh...@gmail.com>.
I would assign timestamps directly at the source.
Timestamps are not striped of by operators.

Reassigning timestamps somewhere in the middle of a job can cause very
unexpected results.

2016-09-08 9:32 GMT+02:00 Dong-iL, Kim <ki...@gmail.com>:

> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
>
> > On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <
> pushpendra.jaiswal90@gmail.com> wrote:
> >
> > Please refer
> > https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_timestamps_watermarks.html
> > for assigning timestamps.
> >
> > You can do map after keyby to assign timestamps
> >
> > e.g:
> >
> > val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
> >        .filter( _.severity == WARNING )
> >        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> >
> > withTimestampsAndWatermarks
> >        .keyBy( _.getGroup )
> >        .timeWindow(Time.seconds(10))
> >        .reduce( (a, b) => a.add(b) )
> >        .addSink(...
> >
> > ~Pushpendra
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-
> after-keyBy-tp8962p8964.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>

Re: assignTimestamp after keyBy

Posted by "Dong-iL, Kim" <ki...@gmail.com>.
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards

> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <pu...@gmail.com> wrote:
> 
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> for assigning timestamps.
> 
> You can do map after keyby to assign timestamps
> 
> e.g:
> 
> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>        .filter( _.severity == WARNING )
>        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> 
> withTimestampsAndWatermarks
>        .keyBy( _.getGroup )
>        .timeWindow(Time.seconds(10))
>        .reduce( (a, b) => a.add(b) )
>        .addSink(...
> 
> ~Pushpendra
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: assignTimestamp after keyBy

Posted by "pushpendra.jaiswal" <pu...@gmail.com>.
Please refer
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
for assigning timestamps.
 
You can do map after keyby to assign timestamps

e.g:

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...

~Pushpendra



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.