You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by HunterXHunter <13...@qq.com> on 2021/03/08 08:01:29 UTC

BUG :DataStream 转 Table 后无法 触发窗口计算

1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口

例如:
 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));

// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
// map ........
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2&lt;String,
Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
                );


参考 官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html


// stream - 转 Table,指定Rowtime
tableEnv.createTemporaryView("test5",
                r,
                $("msg"),
                $("rowtime").rowtime());

        String sql5 = "select " +
                "msg," +
                "count(1) cnt" +
                " from test5 " +
                " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
                "";
        tableEnv.executeSql("insert into printlnRetractSink " + sql5);


结果: 无法触发窗口操作。
查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator
// 返回的wtm永远都是 -9223372036854775808
public long getCurrentWatermark() {
			return internalTimerService.currentWatermark();
		}

//
查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是
internalTimerService.currentWatermark() 却拿的是-9223372036854775808

// 当  tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select
msg,rowtime from test group by msg,rowtime"));
语句改为
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test"));

结果就是正确的。
所以这是一个bug吗??








--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
1.12.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
有人知道这个bug吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
试了 1.12.2,还是一样问题。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by Leonard Xu <xb...@gmail.com>.
你好,
你的flink版本是多少?
之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。

我在[1]里修复了,你可以升级对应的版本试下。


祝好,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-21013 <https://issues.apache.org/jira/browse/FLINK-21013> 



> 在 2021年3月10日,14:34,HunterXHunter <13...@qq.com> 写道:
> 
> 再试了一下:
> 修改并行度也不行
>                .setParallelism(9)
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
再试了一下:
修改并行度也不行
                .setParallelism(9)




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

Posted by HunterXHunter <13...@qq.com>.
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2&amp;lt;String,
Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
                );

结果也是无法触发窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/