You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by 邓子琦 <dz...@gmail.com> on 2022/03/04 06:59:18 UTC

[DISUCSS] When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window

I have created an issue on jira
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334
issue

        Hello!

        When we were studying the flink source code, we found that there
was a problem with its algorithm for calculating the window start time.
When *timestamp - offset + windowSize < 0* , the element will be
incorrectly allocated to a window with a WindowSize larger than its own
timestamp.

        The problem is in
*org.apache.flink.streaming.api.windowing.windows.TimeWindow*

public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

        We believe that this violates the constraints between time and
window. *That is, an element should fall within a window whose start time
is less than its own timestamp and whose end time is greater than its own
timestamp.* However, the current situation is when *timestamp - offset +
windowSize < 0*, *the element falls into a future time window.*

       *You can reproduce the bug with the code at the end of the post.*
Solution

        In fact, the original algorithm is no problem in python, the key to
this problem is the processing of the remainder operation by the
programming language.

        We finally think that it should be modified to the following
algorithm.

public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
    return timestamp
            - (timestamp - offset) % windowSize
            - (windowSize & (timestamp - offset) >> 63);
}

        *windowSize & (timestamp - offset) >> 63* The function of this
formula is to subtract windowSize from the overall operation result
when *timestamp
- offset<0*, otherwise do nothing. This way we can handle both positive and
negative timestamps.

        Finally, the element can be assigned to the correct window.

        This code can pass current unit tests.
getWindowStartWithOffset methods in other packages

        I think that there should be many places in
*getWindowStartWithOffset*. We searched for this method in the project and
found that the problem of negative timestamps is handled in *flink.table.*

        Below is their source code.


*org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping*

private long getWindowStartWithOffset(long timestamp, long offset,
long windowSize) {
    long remainder = (timestamp - offset) % windowSize;
    // handle both positive and negative cases    if (remainder < 0) {
        return timestamp - (remainder + windowSize);
    } else {
        return timestamp - remainder;
    }
}

Can we make a pull request?

        If the community deems it necessary to revise it, hopefully this
task can be handed over to us. Our members are all students who have just
graduated from school, and it is a great encouragement for us to contribute
code to flink.

        Thank you so much!

        From Deng Ziqi & Lin Wanni & Guo Yuanfang


 ===========================================
reproduce

/* output
WindowStart: -15000    ExactSize:1    (a,-17000)
WindowStart: -10000    ExactSize:1    (b,-12000)
WindowStart: -5000 ExactSize:2    (c,-7000)
WindowStart: -5000 ExactSize:2    (d,-2000)
WindowStart: 0 ExactSize:1    (e,3000)
WindowStart: 5000  ExactSize:1    (f,8000)
WindowStart: 10000 ExactSize:1    (g,13000)
WindowStart: 15000 ExactSize:1    (h,18000)
 */public class Example {
    public static void main(String[] args) throws Exception {

        final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
        TimeZone.setDefault(timeZone);
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env
                .setParallelism(1)
                .fromElements(
                        Tuple2.of("a",-17*1000L),
                        Tuple2.of("b",-12*1000L),
                        Tuple2.of("c",-7*1000L),
                        Tuple2.of("d",-2*1000L),
                        Tuple2.of("e",3*1000L),
                        Tuple2.of("f",8*1000L),
                        Tuple2.of("g",13*1000L),
                        Tuple2.of("h",18*1000L)
                )
                .assignTimestampsAndWatermarks(

WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new
SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long
extractTimestamp(Tuple2<String, Long> element, long l) {
                                                return element.f1;
                                            }
                                        }
                                )
                )
                .keyBy(r->1)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(
                        new ProcessWindowFunction<Tuple2<String,
Long>, String, Integer, TimeWindow>() {
                            @Override
                            public void process(Integer integer,
ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) throws Exception {
                                for (Tuple2<String, Long> element : elements) {
                                    out.collect("WindowStart:
"+context.window().getStart()
                                            + "\tExactSize:" +
elements.spliterator().getExactSizeIfKnown()+"\t"
                                            + element
                                    );
                                }
                            }
                        }
                )
                .print();
        env.execute();
    }
}

Re: [DISUCSS] When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window

Posted by 邓子琦 <dz...@gmail.com>.
Thanks for your reply, I have submitted the PR.
https://github.com/apache/flink/pull/18982
Looking forward to more discussions.
best wishes

Martijn Visser <ma...@apache.org> 于2022年3月4日周五 17:09写道:

> Hi Deng Ziqi & Lin Wanni & Guo Yuanfang,
>
> First of all, I wanted to let you know that I think the ticket that you've
> created is one of the most extensive and complete tickets I've seen. Thank
> you very much for the effort on this!
>
> Based on your input I think it indeed looks like this should be addressed.
> Perhaps there are other maintainers who are more familiar with this code
> who can give a more in-depth answer.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dz...@gmail.com> wrote:
>
> > I have created an issue on jira
> > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334
> > issue
> >
> >         Hello!
> >
> >         When we were studying the flink source code, we found that there
> > was a problem with its algorithm for calculating the window start time.
> > When *timestamp - offset + windowSize < 0* , the element will be
> > incorrectly allocated to a window with a WindowSize larger than its own
> > timestamp.
> >
> >         The problem is in
> > *org.apache.flink.streaming.api.windowing.windows.TimeWindow*
> >
> > public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> >     return timestamp - (timestamp - offset + windowSize) % windowSize;
> > }
> >
> >         We believe that this violates the constraints between time and
> > window. *That is, an element should fall within a window whose start time
> > is less than its own timestamp and whose end time is greater than its own
> > timestamp.* However, the current situation is when *timestamp - offset +
> > windowSize < 0*, *the element falls into a future time window.*
> >
> >        *You can reproduce the bug with the code at the end of the post.*
> > Solution
> >
> >         In fact, the original algorithm is no problem in python, the key
> to
> > this problem is the processing of the remainder operation by the
> > programming language.
> >
> >         We finally think that it should be modified to the following
> > algorithm.
> >
> > public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> >     return timestamp
> >             - (timestamp - offset) % windowSize
> >             - (windowSize & (timestamp - offset) >> 63);
> > }
> >
> >         *windowSize & (timestamp - offset) >> 63* The function of this
> > formula is to subtract windowSize from the overall operation result
> > when *timestamp
> > - offset<0*, otherwise do nothing. This way we can handle both positive
> and
> > negative timestamps.
> >
> >         Finally, the element can be assigned to the correct window.
> >
> >         This code can pass current unit tests.
> > getWindowStartWithOffset methods in other packages
> >
> >         I think that there should be many places in
> > *getWindowStartWithOffset*. We searched for this method in the project
> and
> > found that the problem of negative timestamps is handled in
> *flink.table.*
> >
> >         Below is their source code.
> >
> >
> >
> *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping*
> >
> > private long getWindowStartWithOffset(long timestamp, long offset,
> > long windowSize) {
> >     long remainder = (timestamp - offset) % windowSize;
> >     // handle both positive and negative cases    if (remainder < 0) {
> >         return timestamp - (remainder + windowSize);
> >     } else {
> >         return timestamp - remainder;
> >     }
> > }
> >
> > Can we make a pull request?
> >
> >         If the community deems it necessary to revise it, hopefully this
> > task can be handed over to us. Our members are all students who have just
> > graduated from school, and it is a great encouragement for us to
> contribute
> > code to flink.
> >
> >         Thank you so much!
> >
> >         From Deng Ziqi & Lin Wanni & Guo Yuanfang
> >
> >
> >  ===========================================
> > reproduce
> >
> > /* output
> > WindowStart: -15000    ExactSize:1    (a,-17000)
> > WindowStart: -10000    ExactSize:1    (b,-12000)
> > WindowStart: -5000 ExactSize:2    (c,-7000)
> > WindowStart: -5000 ExactSize:2    (d,-2000)
> > WindowStart: 0 ExactSize:1    (e,3000)
> > WindowStart: 5000  ExactSize:1    (f,8000)
> > WindowStart: 10000 ExactSize:1    (g,13000)
> > WindowStart: 15000 ExactSize:1    (h,18000)
> >  */public class Example {
> >     public static void main(String[] args) throws Exception {
> >
> >         final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
> >         TimeZone.setDefault(timeZone);
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         env
> >                 .setParallelism(1)
> >                 .fromElements(
> >                         Tuple2.of("a",-17*1000L),
> >                         Tuple2.of("b",-12*1000L),
> >                         Tuple2.of("c",-7*1000L),
> >                         Tuple2.of("d",-2*1000L),
> >                         Tuple2.of("e",3*1000L),
> >                         Tuple2.of("f",8*1000L),
> >                         Tuple2.of("g",13*1000L),
> >                         Tuple2.of("h",18*1000L)
> >                 )
> >                 .assignTimestampsAndWatermarks(
> >
> > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
> >                                 .withTimestampAssigner(
> >                                         new
> > SerializableTimestampAssigner<Tuple2<String, Long>>() {
> >                                             @Override
> >                                             public long
> > extractTimestamp(Tuple2<String, Long> element, long l) {
> >                                                 return element.f1;
> >                                             }
> >                                         }
> >                                 )
> >                 )
> >                 .keyBy(r->1)
> >                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> >                 .process(
> >                         new ProcessWindowFunction<Tuple2<String,
> > Long>, String, Integer, TimeWindow>() {
> >                             @Override
> >                             public void process(Integer integer,
> > ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> > Collector<String> out) throws Exception {
> >                                 for (Tuple2<String, Long> element :
> > elements) {
> >                                     out.collect("WindowStart:
> > "+context.window().getStart()
> >                                             + "\tExactSize:" +
> > elements.spliterator().getExactSizeIfKnown()+"\t"
> >                                             + element
> >                                     );
> >                                 }
> >                             }
> >                         }
> >                 )
> >                 .print();
> >         env.execute();
> >     }
> > }
> >
>

Re: [DISUCSS] When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window

Posted by Martijn Visser <ma...@apache.org>.
Hi Deng Ziqi & Lin Wanni & Guo Yuanfang,

First of all, I wanted to let you know that I think the ticket that you've
created is one of the most extensive and complete tickets I've seen. Thank
you very much for the effort on this!

Based on your input I think it indeed looks like this should be addressed.
Perhaps there are other maintainers who are more familiar with this code
who can give a more in-depth answer.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dz...@gmail.com> wrote:

> I have created an issue on jira
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334
> issue
>
>         Hello!
>
>         When we were studying the flink source code, we found that there
> was a problem with its algorithm for calculating the window start time.
> When *timestamp - offset + windowSize < 0* , the element will be
> incorrectly allocated to a window with a WindowSize larger than its own
> timestamp.
>
>         The problem is in
> *org.apache.flink.streaming.api.windowing.windows.TimeWindow*
>
> public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
>     return timestamp - (timestamp - offset + windowSize) % windowSize;
> }
>
>         We believe that this violates the constraints between time and
> window. *That is, an element should fall within a window whose start time
> is less than its own timestamp and whose end time is greater than its own
> timestamp.* However, the current situation is when *timestamp - offset +
> windowSize < 0*, *the element falls into a future time window.*
>
>        *You can reproduce the bug with the code at the end of the post.*
> Solution
>
>         In fact, the original algorithm is no problem in python, the key to
> this problem is the processing of the remainder operation by the
> programming language.
>
>         We finally think that it should be modified to the following
> algorithm.
>
> public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
>     return timestamp
>             - (timestamp - offset) % windowSize
>             - (windowSize & (timestamp - offset) >> 63);
> }
>
>         *windowSize & (timestamp - offset) >> 63* The function of this
> formula is to subtract windowSize from the overall operation result
> when *timestamp
> - offset<0*, otherwise do nothing. This way we can handle both positive and
> negative timestamps.
>
>         Finally, the element can be assigned to the correct window.
>
>         This code can pass current unit tests.
> getWindowStartWithOffset methods in other packages
>
>         I think that there should be many places in
> *getWindowStartWithOffset*. We searched for this method in the project and
> found that the problem of negative timestamps is handled in *flink.table.*
>
>         Below is their source code.
>
>
> *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping*
>
> private long getWindowStartWithOffset(long timestamp, long offset,
> long windowSize) {
>     long remainder = (timestamp - offset) % windowSize;
>     // handle both positive and negative cases    if (remainder < 0) {
>         return timestamp - (remainder + windowSize);
>     } else {
>         return timestamp - remainder;
>     }
> }
>
> Can we make a pull request?
>
>         If the community deems it necessary to revise it, hopefully this
> task can be handed over to us. Our members are all students who have just
> graduated from school, and it is a great encouragement for us to contribute
> code to flink.
>
>         Thank you so much!
>
>         From Deng Ziqi & Lin Wanni & Guo Yuanfang
>
>
>  ===========================================
> reproduce
>
> /* output
> WindowStart: -15000    ExactSize:1    (a,-17000)
> WindowStart: -10000    ExactSize:1    (b,-12000)
> WindowStart: -5000 ExactSize:2    (c,-7000)
> WindowStart: -5000 ExactSize:2    (d,-2000)
> WindowStart: 0 ExactSize:1    (e,3000)
> WindowStart: 5000  ExactSize:1    (f,8000)
> WindowStart: 10000 ExactSize:1    (g,13000)
> WindowStart: 15000 ExactSize:1    (h,18000)
>  */public class Example {
>     public static void main(String[] args) throws Exception {
>
>         final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
>         TimeZone.setDefault(timeZone);
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env
>                 .setParallelism(1)
>                 .fromElements(
>                         Tuple2.of("a",-17*1000L),
>                         Tuple2.of("b",-12*1000L),
>                         Tuple2.of("c",-7*1000L),
>                         Tuple2.of("d",-2*1000L),
>                         Tuple2.of("e",3*1000L),
>                         Tuple2.of("f",8*1000L),
>                         Tuple2.of("g",13*1000L),
>                         Tuple2.of("h",18*1000L)
>                 )
>                 .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
>                                 .withTimestampAssigner(
>                                         new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
>                                             @Override
>                                             public long
> extractTimestamp(Tuple2<String, Long> element, long l) {
>                                                 return element.f1;
>                                             }
>                                         }
>                                 )
>                 )
>                 .keyBy(r->1)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                 .process(
>                         new ProcessWindowFunction<Tuple2<String,
> Long>, String, Integer, TimeWindow>() {
>                             @Override
>                             public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
>                                 for (Tuple2<String, Long> element :
> elements) {
>                                     out.collect("WindowStart:
> "+context.window().getStart()
>                                             + "\tExactSize:" +
> elements.spliterator().getExactSizeIfKnown()+"\t"
>                                             + element
>                                     );
>                                 }
>                             }
>                         }
>                 )
>                 .print();
>         env.execute();
>     }
> }
>