You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2020/11/25 09:23:27 UTC

flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

你好!     我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test1 {

    public static void main(String[] args) {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        String lTable = "CREATE TABLE l_table (  " +
                " l_a INT,  " +
                " l_b string,  " +
                " l_rt AS localtimestamp,  " +
                " WATERMARK FOR l_rt AS l_rt  " +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.l_a.min'='1',  " +
                " 'fields.l_a.max'='5',  " +
                " 'fields.l_b.length'='5'  " +
                ")";
        bsTableEnv.executeSql(lTable);

        String rTable = "CREATE TABLE r_table (  " +
                " r_a INT,  " +
                " r_b string,  " +
                " r_pt AS proctime()  " +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.r_a.min'='1',  " +
                " 'fields.r_a.max'='5',  " +
                " 'fields.r_b.length'='5'  " +
                ")";
        bsTableEnv.executeSql(rTable);

        String printTable = "CREATE TABLE print (" +
                "  l_a INT,  " +
                "  l_b string,  " +
                "  l_rt timestamp(3),  " +
                "  r_a INT,  " +
                "  r_b string,  " +
                "  r_pt timestamp(3)  " +
                ") WITH (  " +
                " 'connector' = 'print' " +
                ") ";

        bsTableEnv.executeSql(printTable);

        // 运行成功
//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");

        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");

        bsTableEnv.executeSql("insert into print select * from " + joinTable);

    }

}

Re:回复:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

Posted by hailongwang <18...@163.com>.
另一张表可以这么定义:
 String rTable = "CREATE TABLE r_table (  " +
                " r_a INT,  " +
                " r_b string,  " +
                " r_pt AS now(), " +
                 "WATERMARK FOR r_pt AS r_pt" +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.r_a.min'='1',  " +
                " 'fields.r_a.max'='5',  " +
                " 'fields.r_b.length'='5'  " +
                ")";


Best,
Hailong




在 2020-11-25 19:05:04,"Asahi Lee" <97...@qq.com> 写道:
>你好!
>&nbsp; &nbsp; &nbsp; 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <18868816710@163.com&gt;;
>发送时间:&nbsp;2020年11月25日(星期三) 晚上7:31
>收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
>主题:&nbsp;Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
>
>
>
>Hi,
>&nbsp;&nbsp; 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
>&nbsp;&nbsp; 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
>&nbsp;&nbsp; Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
>
>
>Best,
>Hailong
>
>在 2020-11-25 16:23:27,"Asahi Lee" <978466273@qq.com&gt; 写道:
>&gt;你好!&nbsp;&nbsp;&nbsp;&nbsp; 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?&nbsp;&nbsp;&nbsp;&nbsp; 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>&gt;
>&gt;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>&gt;import org.apache.flink.table.api.EnvironmentSettings;
>&gt;import org.apache.flink.table.api.Table;
>&gt;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>&gt;
>&gt;public class Test1 {
>&gt;
>&gt;&nbsp;&nbsp;&nbsp; public static void main(String[] args) {
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String lTable = "CREATE TABLE l_table (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_rt AS localtimestamp,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " WATERMARK FOR l_rt AS l_rt&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'datagen',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'rows-per-second'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_a.min'='1',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_a.max'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_b.length'='5'&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(lTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String rTable = "CREATE TABLE r_table (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_pt AS proctime()&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'datagen',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'rows-per-second'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_a.min'='1',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_a.max'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_b.length'='5'&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(rTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String printTable = "CREATE TABLE print (" +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_rt timestamp(3),&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_pt timestamp(3)&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'print' " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") ";
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(printTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行成功
>&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql("insert into print select * from " + joinTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp; }
>&gt;
>&gt;}

回复:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

Posted by Asahi Lee <97...@qq.com>.
你好!
&nbsp; &nbsp; &nbsp; 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <18868816710@163.com&gt;;
发送时间:&nbsp;2020年11月25日(星期三) 晚上7:31
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题



Hi,
&nbsp;&nbsp; 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
&nbsp;&nbsp; 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
&nbsp;&nbsp; Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。


Best,
Hailong

在 2020-11-25 16:23:27,"Asahi Lee" <978466273@qq.com&gt; 写道:
&gt;你好!&nbsp;&nbsp;&nbsp;&nbsp; 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?&nbsp;&nbsp;&nbsp;&nbsp; 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
&gt;
&gt;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
&gt;import org.apache.flink.table.api.EnvironmentSettings;
&gt;import org.apache.flink.table.api.Table;
&gt;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
&gt;
&gt;public class Test1 {
&gt;
&gt;&nbsp;&nbsp;&nbsp; public static void main(String[] args) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String lTable = "CREATE TABLE l_table (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " l_rt AS localtimestamp,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " WATERMARK FOR l_rt AS l_rt&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'datagen',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'rows-per-second'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_a.min'='1',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_a.max'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.l_b.length'='5'&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(lTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String rTable = "CREATE TABLE r_table (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " r_pt AS proctime()&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'datagen',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'rows-per-second'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_a.min'='1',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_a.max'='5',&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'fields.r_b.length'='5'&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(rTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String printTable = "CREATE TABLE print (" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; l_rt timestamp(3),&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_a INT,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_b string,&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; r_pt timestamp(3)&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " 'connector' = 'print' " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") ";
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(printTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行成功
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql("insert into print select * from " + joinTable);
&gt;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;
&gt;}

Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

Posted by hailongwang <18...@163.com>.
Hi,
   因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
   而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
   Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。


Best,
Hailong

在 2020-11-25 16:23:27,"Asahi Lee" <97...@qq.com> 写道:
>你好!     我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
>    public static void main(String[] args) {
>        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
>        String lTable = "CREATE TABLE l_table (  " +
>                " l_a INT,  " +
>                " l_b string,  " +
>                " l_rt AS localtimestamp,  " +
>                " WATERMARK FOR l_rt AS l_rt  " +
>                ") WITH (  " +
>                " 'connector' = 'datagen',  " +
>                " 'rows-per-second'='5',  " +
>                " 'fields.l_a.min'='1',  " +
>                " 'fields.l_a.max'='5',  " +
>                " 'fields.l_b.length'='5'  " +
>                ")";
>        bsTableEnv.executeSql(lTable);
>
>        String rTable = "CREATE TABLE r_table (  " +
>                " r_a INT,  " +
>                " r_b string,  " +
>                " r_pt AS proctime()  " +
>                ") WITH (  " +
>                " 'connector' = 'datagen',  " +
>                " 'rows-per-second'='5',  " +
>                " 'fields.r_a.min'='1',  " +
>                " 'fields.r_a.max'='5',  " +
>                " 'fields.r_b.length'='5'  " +
>                ")";
>        bsTableEnv.executeSql(rTable);
>
>        String printTable = "CREATE TABLE print (" +
>                "  l_a INT,  " +
>                "  l_b string,  " +
>                "  l_rt timestamp(3),  " +
>                "  r_a INT,  " +
>                "  r_b string,  " +
>                "  r_pt timestamp(3)  " +
>                ") WITH (  " +
>                " 'connector' = 'print' " +
>                ") ";
>
>        bsTableEnv.executeSql(printTable);
>
>        // 运行成功
>//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>
>        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
>        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>
>        bsTableEnv.executeSql("insert into print select * from " + joinTable);
>
>    }
>
>}