You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jacky-cui <82...@qq.com> on 2020/09/03 04:35:10 UTC
回复: 请指教一个关于时间窗的问题,非常感谢!
taochang说的很对,不过我觉得还要补充一点,窗口是左开右闭,你小时窗口,默认窗口是 [17:00,18:00),由于你watermark是5秒,你的 (event time - 5) 满足 >= 18:00就会触发,漏了等于,同时触发的另一个条件是窗口有数据
------------------ 原始邮件 ------------------
发件人: "user-zh" <taochanglian@163.com>;
发送时间: 2020年9月3日(星期四) 上午10:35
收件人: "user-zh"<user-zh@flink.apache.org>;"samuel.qiu@ubtrobot.com"<samuel.qiu@ubtrobot.com>;
主题: Re: 请指教一个关于时间窗的问题,非常感谢!
没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照
getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0
点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0
的时间,比如:2020-09-01 18:00:00.001出发。
再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发
在 2020/9/2 15:20, samuel.qiu@ubtrobot.com 写道:
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)->event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
> "(starttime + interval '8' hour ) as stime," +
> "(endtime + interval '8' hour ) as etime " +
> "from (select appid,eventid,count(*) as cnt," +
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," +
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " +
> "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!