You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/03/03 09:44:00 UTC

[jira] [Updated] (FLINK-26334) when the (timestamp - offset + windowSize) is less than 0 the calculation result of TimeWindow.getWindowSTartWithOffset is incorrect

     [ https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser updated FLINK-26334:
-----------------------------------
    Priority: Major  (was: Blocker)

> when the (timestamp - offset + windowSize) is less than 0 the calculation result of TimeWindow.getWindowSTartWithOffset is incorrect
> ------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26334
>                 URL: https://issues.apache.org/jira/browse/FLINK-26334
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.0, 1.14.3
>         Environment: flink version 1.14.3
>            Reporter: realdengziqi
>            Priority: Major
>   Original Estimate: 16h
>  Remaining Estimate: 16h
>
>  
> source code
> {code:java}
> //Method to get the window start for a timestamp.
> //Params:
> //timestamp – epoch millisecond to get the window start.
> //offset – The offset which window start would be shifted by.
> //windowSize – The size of the generated windows.
> //Returns:
> //window start
> public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
>     return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> If windowSize is 5 seconds, an element with a timestamp of -7000L should be assigned to a window with a start time of -10*1000L. But this code will assign it to the window whose start time is -5000L.
> According to the current calculation method, when the timestamp is (timestamp - offset + windowSize)  is less than 0, the start time of the calculated time window will be offset by one windowsSide unit in the direction of 0.
> I had a discussion with a friend and thought it was because the current calculation logic is rounding towards 0. We should make it round to -∞.
> Do you think this is a bug. We would like to submit a pull request on github to fix it.
> Below is a sample program for a scrolling window.
> {code:java}
> public class Test01 {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         env
>                 .fromElements(
>                         Tuple2.of("a",-7L*1000L),    // start time should be -12s
>                         Tuple2.of("b",-1L*1000L),
>                         Tuple2.of("c",1L*1000L),
>                         Tuple2.of("d",7L*1000L)
>                 )
>                 .assignTimestampsAndWatermarks(
>                         WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
>                                 .withTimestampAssigner(
>                                         new SerializableTimestampAssigner<Tuple2<String, Long>>() {
>                                             @Override
>                                             public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
>                                                 return element.f1;
>                                             }
>                                         }
>                                 )
>                 )
>                 .keyBy(r->1)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(6)))
>                 .process(
>                         new ProcessWindowFunction<Tuple2<String, Long>, String, Integer, TimeWindow>() {
>                             @Override
>                             public void process(Integer integer, ProcessWindowFunction<Tuple2<String, Long>, String, Integer, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
>                                 for (Tuple2<String, Long> element : elements) {
>                                     JSONObject item = new JSONObject();
>                                     item.put("data",element.toString());
>                                     item.put("windowStartTime",new Timestamp(context.window().getStart()).toString() );
>                                     item.put("windowEndTime",new Timestamp(context.window().getEnd()).toString() );
>                                     out.collect(item.toJSONString());
>                                 }
>                             }
>                         }
>                 )
>                 .print();
>         env.execute();
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)