You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 邓子琦 <dz...@gmail.com> on 2022/03/03 09:33:52 UTC

[DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口问题

你好!

我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。

问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow

    public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

[image: image-20220303144539703.png]

我们认为,这违背了时间和窗口之间的逻辑。也就是*一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去*。但是,目前的情况是当timestamp
- offset + windowSize < 0 时,*元素会落在一个未来的时间窗口中*。
解决方法

其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。

最后,我们认为应当修改为下述算法。

    public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
        return timestamp
                - (timestamp - offset) % windowSize
                - (windowSize & (timestamp - offset) >> 63);
    }

(windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp - offset)<0
时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。

这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。[image: image-20220303144558158.png]

这段代码可以通过目前的单元测试。
其他包中的 getWindowStartWithOffset 方法

想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table
中就处理了负时间戳的问题。

下面是他们的源码。

org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

    private long getWindowStartWithOffset(long timestamp, long offset,
long windowSize) {
        long remainder = (timestamp - offset) % windowSize;
        // handle both positive and negative cases
        if (remainder < 0) {
            return timestamp - (remainder + windowSize);
        } else {
            return timestamp - remainder;
        }
    }

我们可以发起一个pull request吗?

如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。

十分感谢!

这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334

来自 邓子琦 & 林婉妮 & 郭元方

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

Posted by yidan zhao <hi...@gmail.com>.
嗯,问题的确存在。  只是场景有点特别,ts一般是时间戳,ts本身负数或很小的情况这个我没考虑。

邓子琦 <dz...@gmail.com> 于2022年3月4日周五 14:28写道:

> 不是的,这是一个现存问题
>
> abs(offset)>size的约束并不能让ts-offset+size必然大于0
>
> 图中给出的示例是我们用代码验证过的。你可以尝试运行下面的代码
>
> 会发现它暴露出来的问题跟我所描述的一样
>
> /* output
>
> *窗口开始时间是    -15000  有1个元素       数据是 (a,-17000)
>
> *窗口开始时间是    -10000  有1个元素       数据是 (b,-12000)
>
> *窗口开始时间是    -5000   有2个元素       数据是 (c,-7000)
>
> *窗口开始时间是    -5000   有2个元素       数据是 (d,-2000)
>
> *窗口开始时间是    0   有1个元素       数据是 (e,3000)
>
> *窗口开始时间是    5000    有1个元素       数据是 (f,8000)
>
> *窗口开始时间是    10000   有1个元素       数据是 (g,13000)
>
> *窗口开始时间是    15000   有1个元素       数据是 (h,18000)
>
> */
>
> public class Example {
>
>     public static void main(String[] args) throws Exception {
>
>         final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
>
>         TimeZone.setDefault(timeZone);
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env
>
>                 .setParallelism(1)
>
>                 .fromElements(
>
>                         Tuple2.of("a",-17*1000L),
>
>                         Tuple2.of("b",-12*1000L),
>
>                         Tuple2.of("c",-7*1000L),
>
>                         Tuple2.of("d",-2*1000L),
>
>                         Tuple2.of("e",3*1000L),
>
>                         Tuple2.of("f",8*1000L),
>
>                         Tuple2.of("g",13*1000L),
>
>                         Tuple2.of("h",18*1000L)
>
>                 )
>
>                 .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
>
>                                 .withTimestampAssigner(
>
>                                         new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
>
>                                             @Override
>
>                                             public long
> extractTimestamp(Tuple2<String, Long> element, long l) {
>
>                                                 return element.f1;
>
>                                             }
>
>                                         }
>
>                                 )
>
>                 )
>
>                 .keyBy(r->1)
>
>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>
>                 .process(
>
>                         new ProcessWindowFunction<Tuple2<String,
> Long>, String, Integer, TimeWindow>() {
>
>                             @Override
>
>                             public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
>
>                                 for (Tuple2<String, Long> element :
> elements) {
>
>
> out.collect("窗口开始时间是\t"+context.window().getStart()
>
>                                             + "\t有" +
> elements.spliterator().getExactSizeIfKnown()+"个元素"
>
>                                             + "\t\t数据是\t"+ element
>
>                                     );
>
>                                 }
>
>                             }
>
>                         }
>
>                 )
>
>                 .print();
>
>         env.execute();
>
>     }
>
> }
>
>
> yidan zhao <hi...@gmail.com> 于2022年3月4日周五 10:30写道:
>
> > 1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
> > 为例,分配window的时候的逻辑为:
> >
> > long start =
> >         TimeWindow.getWindowStartWithOffset(
> >                 timestamp, (globalOffset + staggerOffset) % size, size);
> >
> >
> > 构造函数中offset逻辑:
> >
> > protected TumblingEventTimeWindows(long size, long offset,
> > WindowStagger windowStagger) {
> >     if (Math.abs(offset) >= size) {
> >         throw new IllegalArgumentException(
> >                 "TumblingEventTimeWindows parameters must satisfy
> > abs(offset) < size");
> >     }
> >
> >     this.size = size;
> >     this.globalOffset = offset;
> >     this.windowStagger = windowStagger;
> > }
> >
> >
> > 之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
> > ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。
> >
> > 邓子琦 <dz...@gmail.com> 于2022年3月3日周四 19:49写道:
> >
> > > 好滴 谢谢
> > >
> > > yu'an huang <h....@gmail.com> 于2022年3月3日周四 18:17写道:
> > >
> > > >
> > > > 我想你们可以为Flink贡献代码。只要按照guide
> > > > https://flink.apache.org/contributing/contribute-code.html <
> > > >
> > >
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > > > >
> > > >
> > > > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> > > >
> > > >
> > > >
> > > > > On 3 Mar 2022, at 5:33 PM, 邓子琦 <dz...@gmail.com> wrote:
> > > > >
> > > > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > > > >
> > > > > 问题
> > > > >
> > > > > 你好!
> > > > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize
> < 0
> > > > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > > > >    public static long getWindowStartWithOffset(long timestamp, long
> > > > offset, long windowSize) {
> > > > >        return timestamp - (timestamp - offset + windowSize) %
> > > windowSize;
> > > > >   }
> > > > >
> > > > >
> > > >
> > >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> > > > - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > > > > 解决方法
> > > > >
> > > > > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > > > > 最后,我们认为应当修改为下述算法。
> > > > >    public static long getWindowStartWithOffset(long timestamp, long
> > > > offset, long windowSize) {
> > > > >        return timestamp
> > > > >                - (timestamp - offset) % windowSize
> > > > >                - (windowSize & (timestamp - offset) >> 63);
> > > > >   }
> > > > > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> > > > offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > > > > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > > > > 这段代码可以通过目前的单元测试。
> > > > > 其他包中的 getWindowStartWithOffset 方法
> > > > >
> > > > >
> > > >
> > >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > > > > 下面是他们的源码。
> > > > >
> > >
> org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> > > > >    private long getWindowStartWithOffset(long timestamp, long
> offset,
> > > > long windowSize) {
> > > > >        long remainder = (timestamp - offset) % windowSize;
> > > > >        // handle both positive and negative cases
> > > > >        if (remainder < 0) {
> > > > >            return timestamp - (remainder + windowSize);
> > > > >       } else {
> > > > >            return timestamp - remainder;
> > > > >       }
> > > > >   }
> > > > > 我们可以发起一个pull request吗?
> > > > >
> > > > >
> 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > > > > 十分感谢!
> > > > > 这是我们在 jira 上创建的 issue
> > > https://issues.apache.org/jira/browse/FLINK-26334
> > > > <https://issues.apache.org/jira/browse/FLINK-26334>
> > > > > 来自 邓子琦 & 林婉妮 & 郭元方
> > > > >
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

Posted by 邓子琦 <dz...@gmail.com>.
不是的,这是一个现存问题

abs(offset)>size的约束并不能让ts-offset+size必然大于0

图中给出的示例是我们用代码验证过的。你可以尝试运行下面的代码

会发现它暴露出来的问题跟我所描述的一样

/* output

*窗口开始时间是    -15000  有1个元素       数据是 (a,-17000)

*窗口开始时间是    -10000  有1个元素       数据是 (b,-12000)

*窗口开始时间是    -5000   有2个元素       数据是 (c,-7000)

*窗口开始时间是    -5000   有2个元素       数据是 (d,-2000)

*窗口开始时间是    0   有1个元素       数据是 (e,3000)

*窗口开始时间是    5000    有1个元素       数据是 (f,8000)

*窗口开始时间是    10000   有1个元素       数据是 (g,13000)

*窗口开始时间是    15000   有1个元素       数据是 (h,18000)

*/

public class Example {

    public static void main(String[] args) throws Exception {

        final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");

        TimeZone.setDefault(timeZone);

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env

                .setParallelism(1)

                .fromElements(

                        Tuple2.of("a",-17*1000L),

                        Tuple2.of("b",-12*1000L),

                        Tuple2.of("c",-7*1000L),

                        Tuple2.of("d",-2*1000L),

                        Tuple2.of("e",3*1000L),

                        Tuple2.of("f",8*1000L),

                        Tuple2.of("g",13*1000L),

                        Tuple2.of("h",18*1000L)

                )

                .assignTimestampsAndWatermarks(


WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()

                                .withTimestampAssigner(

                                        new
SerializableTimestampAssigner<Tuple2<String, Long>>() {

                                            @Override

                                            public long
extractTimestamp(Tuple2<String, Long> element, long l) {

                                                return element.f1;

                                            }

                                        }

                                )

                )

                .keyBy(r->1)

                .window(TumblingEventTimeWindows.of(Time.seconds(5)))

                .process(

                        new ProcessWindowFunction<Tuple2<String,
Long>, String, Integer, TimeWindow>() {

                            @Override

                            public void process(Integer integer,
ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) throws Exception {

                                for (Tuple2<String, Long> element : elements) {


out.collect("窗口开始时间是\t"+context.window().getStart()

                                            + "\t有" +
elements.spliterator().getExactSizeIfKnown()+"个元素"

                                            + "\t\t数据是\t"+ element

                                    );

                                }

                            }

                        }

                )

                .print();

        env.execute();

    }

}


yidan zhao <hi...@gmail.com> 于2022年3月4日周五 10:30写道:

> 1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
> 为例,分配window的时候的逻辑为:
>
> long start =
>         TimeWindow.getWindowStartWithOffset(
>                 timestamp, (globalOffset + staggerOffset) % size, size);
>
>
> 构造函数中offset逻辑:
>
> protected TumblingEventTimeWindows(long size, long offset,
> WindowStagger windowStagger) {
>     if (Math.abs(offset) >= size) {
>         throw new IllegalArgumentException(
>                 "TumblingEventTimeWindows parameters must satisfy
> abs(offset) < size");
>     }
>
>     this.size = size;
>     this.globalOffset = offset;
>     this.windowStagger = windowStagger;
> }
>
>
> 之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
> ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。
>
> 邓子琦 <dz...@gmail.com> 于2022年3月3日周四 19:49写道:
>
> > 好滴 谢谢
> >
> > yu'an huang <h....@gmail.com> 于2022年3月3日周四 18:17写道:
> >
> > >
> > > 我想你们可以为Flink贡献代码。只要按照guide
> > > https://flink.apache.org/contributing/contribute-code.html <
> > >
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > > >
> > >
> > > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> > >
> > >
> > >
> > > > On 3 Mar 2022, at 5:33 PM, 邓子琦 <dz...@gmail.com> wrote:
> > > >
> > > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > > >
> > > > 问题
> > > >
> > > > 你好!
> > > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> > > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > > >    public static long getWindowStartWithOffset(long timestamp, long
> > > offset, long windowSize) {
> > > >        return timestamp - (timestamp - offset + windowSize) %
> > windowSize;
> > > >   }
> > > >
> > > >
> > >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> > > - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > > > 解决方法
> > > >
> > > > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > > > 最后,我们认为应当修改为下述算法。
> > > >    public static long getWindowStartWithOffset(long timestamp, long
> > > offset, long windowSize) {
> > > >        return timestamp
> > > >                - (timestamp - offset) % windowSize
> > > >                - (windowSize & (timestamp - offset) >> 63);
> > > >   }
> > > > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> > > offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > > > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > > > 这段代码可以通过目前的单元测试。
> > > > 其他包中的 getWindowStartWithOffset 方法
> > > >
> > > >
> > >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > > > 下面是他们的源码。
> > > >
> > org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> > > >    private long getWindowStartWithOffset(long timestamp, long offset,
> > > long windowSize) {
> > > >        long remainder = (timestamp - offset) % windowSize;
> > > >        // handle both positive and negative cases
> > > >        if (remainder < 0) {
> > > >            return timestamp - (remainder + windowSize);
> > > >       } else {
> > > >            return timestamp - remainder;
> > > >       }
> > > >   }
> > > > 我们可以发起一个pull request吗?
> > > >
> > > > 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > > > 十分感谢!
> > > > 这是我们在 jira 上创建的 issue
> > https://issues.apache.org/jira/browse/FLINK-26334
> > > <https://issues.apache.org/jira/browse/FLINK-26334>
> > > > 来自 邓子琦 & 林婉妮 & 郭元方
> > > >
> > >
> > >
> >
>

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

Posted by yidan zhao <hi...@gmail.com>.
1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
为例,分配window的时候的逻辑为:

long start =
        TimeWindow.getWindowStartWithOffset(
                timestamp, (globalOffset + staggerOffset) % size, size);


构造函数中offset逻辑:

protected TumblingEventTimeWindows(long size, long offset,
WindowStagger windowStagger) {
    if (Math.abs(offset) >= size) {
        throw new IllegalArgumentException(
                "TumblingEventTimeWindows parameters must satisfy
abs(offset) < size");
    }

    this.size = size;
    this.globalOffset = offset;
    this.windowStagger = windowStagger;
}


之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。

邓子琦 <dz...@gmail.com> 于2022年3月3日周四 19:49写道:

> 好滴 谢谢
>
> yu'an huang <h....@gmail.com> 于2022年3月3日周四 18:17写道:
>
> >
> > 我想你们可以为Flink贡献代码。只要按照guide
> > https://flink.apache.org/contributing/contribute-code.html <
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > >
> >
> > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> >
> >
> >
> > > On 3 Mar 2022, at 5:33 PM, 邓子琦 <dz...@gmail.com> wrote:
> > >
> > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > >
> > > 问题
> > >
> > > 你好!
> > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > >    public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> > >        return timestamp - (timestamp - offset + windowSize) %
> windowSize;
> > >   }
> > >
> > >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> > - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > > 解决方法
> > >
> > > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > > 最后,我们认为应当修改为下述算法。
> > >    public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> > >        return timestamp
> > >                - (timestamp - offset) % windowSize
> > >                - (windowSize & (timestamp - offset) >> 63);
> > >   }
> > > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> > offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > > 这段代码可以通过目前的单元测试。
> > > 其他包中的 getWindowStartWithOffset 方法
> > >
> > >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > > 下面是他们的源码。
> > >
> org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> > >    private long getWindowStartWithOffset(long timestamp, long offset,
> > long windowSize) {
> > >        long remainder = (timestamp - offset) % windowSize;
> > >        // handle both positive and negative cases
> > >        if (remainder < 0) {
> > >            return timestamp - (remainder + windowSize);
> > >       } else {
> > >            return timestamp - remainder;
> > >       }
> > >   }
> > > 我们可以发起一个pull request吗?
> > >
> > > 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > > 十分感谢!
> > > 这是我们在 jira 上创建的 issue
> https://issues.apache.org/jira/browse/FLINK-26334
> > <https://issues.apache.org/jira/browse/FLINK-26334>
> > > 来自 邓子琦 & 林婉妮 & 郭元方
> > >
> >
> >
>

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

Posted by 邓子琦 <dz...@gmail.com>.
好滴 谢谢

yu'an huang <h....@gmail.com> 于2022年3月3日周四 18:17写道:

>
> 我想你们可以为Flink贡献代码。只要按照guide
> https://flink.apache.org/contributing/contribute-code.html <
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> >
>
> 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
>
>
>
> > On 3 Mar 2022, at 5:33 PM, 邓子琦 <dz...@gmail.com> wrote:
> >
> > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> >
> > 问题
> >
> > 你好!
> > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> >    public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
> >        return timestamp - (timestamp - offset + windowSize) % windowSize;
> >   }
> >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > 解决方法
> >
> > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > 最后,我们认为应当修改为下述算法。
> >    public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
> >        return timestamp
> >                - (timestamp - offset) % windowSize
> >                - (windowSize & (timestamp - offset) >> 63);
> >   }
> > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > 这段代码可以通过目前的单元测试。
> > 其他包中的 getWindowStartWithOffset 方法
> >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > 下面是他们的源码。
> > org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> >    private long getWindowStartWithOffset(long timestamp, long offset,
> long windowSize) {
> >        long remainder = (timestamp - offset) % windowSize;
> >        // handle both positive and negative cases
> >        if (remainder < 0) {
> >            return timestamp - (remainder + windowSize);
> >       } else {
> >            return timestamp - remainder;
> >       }
> >   }
> > 我们可以发起一个pull request吗?
> >
> > 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > 十分感谢!
> > 这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334
> <https://issues.apache.org/jira/browse/FLINK-26334>
> > 来自 邓子琦 & 林婉妮 & 郭元方
> >
>
>

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

Posted by yu'an huang <h....@gmail.com>.
我想你们可以为Flink贡献代码。只要按照guide https://flink.apache.org/contributing/contribute-code.html <https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA>

建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。



> On 3 Mar 2022, at 5:33 PM, 邓子琦 <dz...@gmail.com> wrote:
> 
> 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> 
> 问题
> 
> 你好!
> 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
>    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
>        return timestamp - (timestamp - offset + windowSize) % windowSize;
>   }
> 
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> 解决方法
> 
> 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> 最后,我们认为应当修改为下述算法。
>    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
>        return timestamp
>                - (timestamp - offset) % windowSize
>                - (windowSize & (timestamp - offset) >> 63);
>   }
> (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp - offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> 这段代码可以通过目前的单元测试。
> 其他包中的 getWindowStartWithOffset 方法
> 
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> 下面是他们的源码。
> org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
>    private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
>        long remainder = (timestamp - offset) % windowSize;
>        // handle both positive and negative cases
>        if (remainder < 0) {
>            return timestamp - (remainder + windowSize);
>       } else {
>            return timestamp - remainder;
>       }
>   }
> 我们可以发起一个pull request吗?
> 
> 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> 十分感谢!
> 这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334 <https://issues.apache.org/jira/browse/FLINK-26334>
> 来自 邓子琦 & 林婉妮 & 郭元方
>