You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2020/11/25 09:23:27 UTC
flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Test1 {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String lTable = "CREATE TABLE l_table ( " +
" l_a INT, " +
" l_b string, " +
" l_rt AS localtimestamp, " +
" WATERMARK FOR l_rt AS l_rt " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.l_a.min'='1', " +
" 'fields.l_a.max'='5', " +
" 'fields.l_b.length'='5' " +
")";
bsTableEnv.executeSql(lTable);
String rTable = "CREATE TABLE r_table ( " +
" r_a INT, " +
" r_b string, " +
" r_pt AS proctime() " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.r_a.min'='1', " +
" 'fields.r_a.max'='5', " +
" 'fields.r_b.length'='5' " +
")";
bsTableEnv.executeSql(rTable);
String printTable = "CREATE TABLE print (" +
" l_a INT, " +
" l_b string, " +
" l_rt timestamp(3), " +
" r_a INT, " +
" r_b string, " +
" r_pt timestamp(3) " +
") WITH ( " +
" 'connector' = 'print' " +
") ";
bsTableEnv.executeSql(printTable);
// 运行成功
// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
// 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
bsTableEnv.executeSql("insert into print select * from " + joinTable);
}
}
Re:回复:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
Posted by hailongwang <18...@163.com>.
另一张表可以这么定义:
String rTable = "CREATE TABLE r_table ( " +
" r_a INT, " +
" r_b string, " +
" r_pt AS now(), " +
"WATERMARK FOR r_pt AS r_pt" +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='5', " +
" 'fields.r_a.min'='1', " +
" 'fields.r_a.max'='5', " +
" 'fields.r_b.length'='5' " +
")";
Best,
Hailong
在 2020-11-25 19:05:04,"Asahi Lee" <97...@qq.com> 写道:
>你好!
> 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人: "user-zh" <18868816710@163.com>;
>发送时间: 2020年11月25日(星期三) 晚上7:31
>收件人: "user-zh"<user-zh@flink.apache.org>;
>
>主题: Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
>
>
>
>Hi,
> 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
> 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
> Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
>
>
>Best,
>Hailong
>
>在 2020-11-25 16:23:27,"Asahi Lee" <978466273@qq.com> 写道:
>>你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>>
>>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>import org.apache.flink.table.api.EnvironmentSettings;
>>import org.apache.flink.table.api.Table;
>>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>
>>public class Test1 {
>>
>> public static void main(String[] args) {
>> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> String lTable = "CREATE TABLE l_table ( " +
>> " l_a INT, " +
>> " l_b string, " +
>> " l_rt AS localtimestamp, " +
>> " WATERMARK FOR l_rt AS l_rt " +
>> ") WITH ( " +
>> " 'connector' = 'datagen', " +
>> " 'rows-per-second'='5', " +
>> " 'fields.l_a.min'='1', " +
>> " 'fields.l_a.max'='5', " +
>> " 'fields.l_b.length'='5' " +
>> ")";
>> bsTableEnv.executeSql(lTable);
>>
>> String rTable = "CREATE TABLE r_table ( " +
>> " r_a INT, " +
>> " r_b string, " +
>> " r_pt AS proctime() " +
>> ") WITH ( " +
>> " 'connector' = 'datagen', " +
>> " 'rows-per-second'='5', " +
>> " 'fields.r_a.min'='1', " +
>> " 'fields.r_a.max'='5', " +
>> " 'fields.r_b.length'='5' " +
>> ")";
>> bsTableEnv.executeSql(rTable);
>>
>> String printTable = "CREATE TABLE print (" +
>> " l_a INT, " +
>> " l_b string, " +
>> " l_rt timestamp(3), " +
>> " r_a INT, " +
>> " r_b string, " +
>> " r_pt timestamp(3) " +
>> ") WITH ( " +
>> " 'connector' = 'print' " +
>> ") ";
>>
>> bsTableEnv.executeSql(printTable);
>>
>> // 运行成功
>>// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>>
>> // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
>> Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>>
>> bsTableEnv.executeSql("insert into print select * from " + joinTable);
>>
>> }
>>
>>}
回复:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
Posted by Asahi Lee <97...@qq.com>.
你好!
那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
------------------ 原始邮件 ------------------
发件人: "user-zh" <18868816710@163.com>;
发送时间: 2020年11月25日(星期三) 晚上7:31
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
Hi,
因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
Best,
Hailong
在 2020-11-25 16:23:27,"Asahi Lee" <978466273@qq.com> 写道:
>你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
> public static void main(String[] args) {
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> String lTable = "CREATE TABLE l_table ( " +
> " l_a INT, " +
> " l_b string, " +
> " l_rt AS localtimestamp, " +
> " WATERMARK FOR l_rt AS l_rt " +
> ") WITH ( " +
> " 'connector' = 'datagen', " +
> " 'rows-per-second'='5', " +
> " 'fields.l_a.min'='1', " +
> " 'fields.l_a.max'='5', " +
> " 'fields.l_b.length'='5' " +
> ")";
> bsTableEnv.executeSql(lTable);
>
> String rTable = "CREATE TABLE r_table ( " +
> " r_a INT, " +
> " r_b string, " +
> " r_pt AS proctime() " +
> ") WITH ( " +
> " 'connector' = 'datagen', " +
> " 'rows-per-second'='5', " +
> " 'fields.r_a.min'='1', " +
> " 'fields.r_a.max'='5', " +
> " 'fields.r_b.length'='5' " +
> ")";
> bsTableEnv.executeSql(rTable);
>
> String printTable = "CREATE TABLE print (" +
> " l_a INT, " +
> " l_b string, " +
> " l_rt timestamp(3), " +
> " r_a INT, " +
> " r_b string, " +
> " r_pt timestamp(3) " +
> ") WITH ( " +
> " 'connector' = 'print' " +
> ") ";
>
> bsTableEnv.executeSql(printTable);
>
> // 运行成功
>// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>
> // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
> Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>
> bsTableEnv.executeSql("insert into print select * from " + joinTable);
>
> }
>
>}
Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
Posted by hailongwang <18...@163.com>.
Hi,
因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
Best,
Hailong
在 2020-11-25 16:23:27,"Asahi Lee" <97...@qq.com> 写道:
>你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
> public static void main(String[] args) {
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> String lTable = "CREATE TABLE l_table ( " +
> " l_a INT, " +
> " l_b string, " +
> " l_rt AS localtimestamp, " +
> " WATERMARK FOR l_rt AS l_rt " +
> ") WITH ( " +
> " 'connector' = 'datagen', " +
> " 'rows-per-second'='5', " +
> " 'fields.l_a.min'='1', " +
> " 'fields.l_a.max'='5', " +
> " 'fields.l_b.length'='5' " +
> ")";
> bsTableEnv.executeSql(lTable);
>
> String rTable = "CREATE TABLE r_table ( " +
> " r_a INT, " +
> " r_b string, " +
> " r_pt AS proctime() " +
> ") WITH ( " +
> " 'connector' = 'datagen', " +
> " 'rows-per-second'='5', " +
> " 'fields.r_a.min'='1', " +
> " 'fields.r_a.max'='5', " +
> " 'fields.r_b.length'='5' " +
> ")";
> bsTableEnv.executeSql(rTable);
>
> String printTable = "CREATE TABLE print (" +
> " l_a INT, " +
> " l_b string, " +
> " l_rt timestamp(3), " +
> " r_a INT, " +
> " r_b string, " +
> " r_pt timestamp(3) " +
> ") WITH ( " +
> " 'connector' = 'print' " +
> ") ";
>
> bsTableEnv.executeSql(printTable);
>
> // 运行成功
>// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>
> // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
> Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>
> bsTableEnv.executeSql("insert into print select * from " + joinTable);
>
> }
>
>}