You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 咖啡泡油条 <93...@qq.com> on 2020/06/12 09:46:22 UTC

回复: flink sql Temporal table join failed

可以参考之前的邮件列表
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年6月12日(星期五) 下午5:43
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink sql Temporal table join failed




你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。

祝好
Leonard Xu

&gt; 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com&gt; 写道:
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 是的,1.10.0版本
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt; 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
&gt;&gt; 
&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午3:47写道:
&gt;&gt; 
&gt;&gt;&gt; 还是不行,
&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt; explanation.
&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.SqlParserException:
&gt;&gt;&gt; SQL parse failed. Encountered "time FROM" at line 1, column 44.
&gt;&gt;&gt; Was expecting one of:
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURSOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NOT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "(" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "+" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "-" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNSIGNED_INTEGER_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <DECIMAL_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <APPROX_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BINARY_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <PREFIXED_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FALSE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UNKNOWN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_D&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_T&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_TS&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIME" <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "INTERVAL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "?" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CAST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXTRACT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POSITION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CONVERT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRANSLATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OVERLAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FLOOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEIL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEILING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUBSTRING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRIM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CLASSIFIER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MATCH_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RUNNING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PREV" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEXT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_QUERY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECTAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAYAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_FN&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MULTISET" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERIOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SPECIFIC" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BACK_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BRACKET_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ABS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "AVG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CARDINALITY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHAR_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHARACTER_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COALESCE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COLLECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CUME_DIST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DENSE_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ELEMENT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FIRST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FUSION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "GROUPING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "HOUR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEAD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEFT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MIN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MINUTE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MONTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTH_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTILE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULLIF" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OCTET_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERCENT_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SXX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SYY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RIGHT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SECOND" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SQRT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UPPER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUNCATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "YEAR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_CATALOG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_PATH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_ROLE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_SCHEMA" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SESSION_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SYSTEM_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CASE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT" ...
&gt;&gt;&gt; 
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
&gt;&gt;&gt; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt;&gt;&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; query:
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; proctime AS PROCTIME(),
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
&gt;&gt;&gt; 放在select 后面也不行。
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt;&gt;&gt; 你写反了,是proctime AS PROCTIME()。
&gt;&gt;&gt;&gt; 计算列跟普通query里面的AS是反着的。
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午2:24写道:
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; flink 1.10.0:
&gt;&gt;&gt;&gt;&gt; 在create table中,加PROCTIME() AS proctime字段报错
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt;&gt;&gt;&gt;&gt; Hi,
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
&gt;&gt;&gt;&gt;&gt;&gt; 可以参考下[1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; [1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午1:33写道:
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt;&gt;&gt;&gt;&gt; explanation.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt;&gt;&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
&gt;&gt;&gt;&gt;&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Cannot generate a valid execution plan for the given query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
&gt;&gt;&gt;&gt;&gt;&gt;&gt; fields=[time, sum_age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp; +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :- FlinkLogicalCalc(select=[uid, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, user_behavior, source: [KafkaTableSource(uid,
&gt;&gt;&gt;&gt;&gt; phoneType,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalSnapshot(period=[$cor0.time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalCalc(select=[uid, age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, users, source: [MysqlAsyncLookupTableSource(uid,
&gt;&gt;&gt; sex,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; age, created_time)]]], fields=[uid, sex, age, created_time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
&gt;&gt;&gt; left
&gt;&gt;&gt;&gt;&gt;&gt;&gt; table's proctime field, doesn't support 'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Please check the documentation for the set of currently supported SQL
&gt;&gt;&gt;&gt;&gt;&gt;&gt; features.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt; scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Caused by: org.apache.flink.table.api.TableException: Temporal table
&gt;&gt;&gt;&gt;&gt; join
&gt;&gt;&gt;&gt;&gt;&gt;&gt; currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
&gt;&gt;&gt;&gt;&gt;&gt;&gt; field, doesn't support 'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt; org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ... 20 more
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamExecutionEnv =
&gt;&gt;&gt;&gt;&gt; StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val blinkEnvSettings =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamTableEnv =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_cnt (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; sum_age INT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.table' = 'user_cnt',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.username' = 'root',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.password' = '123456',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.write.flush.max-rows' = '1'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val userTableSource = new MysqlAsyncLookupTableSource(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array("uid", "sex", "age", "created_time"),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.registerTableSource("users", userTableSource)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.execute("Temporal table join")
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;

Re:回复: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.
好的

















在 2020-06-12 17:46:22,"咖啡泡油条" <93...@qq.com> 写道:
>可以参考之前的邮件列表
>https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"Leonard Xu"<xbjtdcq@gmail.com&gt;;
>发送时间:&nbsp;2020年6月12日(星期五) 下午5:43
>收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
>主题:&nbsp;Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>&gt; 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com&gt; 写道:
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 是的,1.10.0版本
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt; 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>&gt;&gt; 
>&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午3:47写道:
>&gt;&gt; 
>&gt;&gt;&gt; 还是不行,
>&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
>&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>&gt;&gt;&gt; explanation.
>&gt;&gt;&gt; SLF4J: Actual binding is of type
>&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
>&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
>&gt;&gt;&gt; configuration: logging only errors to the console.
>&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>&gt;&gt;&gt; SQL parse failed. Encountered "time FROM" at line 1, column 44.
>&gt;&gt;&gt; Was expecting one of:
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURSOR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXISTS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NOT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "(" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "+" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "-" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNSIGNED_INTEGER_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <DECIMAL_NUMERIC_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <APPROX_NUMERIC_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BINARY_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <PREFIXED_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_STRING&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FALSE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UNKNOWN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_D&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_T&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_TS&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIME" <QUOTED_STRING&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "INTERVAL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "?" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CAST" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXTRACT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POSITION" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CONVERT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRANSLATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OVERLAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FLOOR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEIL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEILING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUBSTRING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRIM" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CLASSIFIER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MATCH_NUMBER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RUNNING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PREV" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEXT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_EXISTS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_QUERY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECTAGG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAYAGG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_FN&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MULTISET" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ARRAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERIOD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SPECIFIC" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BACK_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BRACKET_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ABS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "AVG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CARDINALITY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHAR_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHARACTER_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COALESCE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COLLECT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CUME_DIST" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COUNT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIME" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DENSE_RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ELEMENT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FIRST_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FUSION" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "GROUPING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "HOUR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEAD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEFT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAST_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIME" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOWER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAX" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MIN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MINUTE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MOD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MONTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTH_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTILE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULLIF" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OCTET_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERCENT_RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POWER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_COUNT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SXX" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SYY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RIGHT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW_NUMBER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SECOND" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SQRT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUM" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UPPER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUNCATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "YEAR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_CATALOG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_PATH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_ROLE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_SCHEMA" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SESSION_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SYSTEM_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEW" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CASE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT" ...
>&gt;&gt;&gt; 
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>&gt;&gt;&gt; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>&gt;&gt;&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; query:
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt; """
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; proctime AS PROCTIME(),
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt; """
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>&gt;&gt;&gt; 放在select 后面也不行。
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt;&gt;&gt; 你写反了,是proctime AS PROCTIME()。
>&gt;&gt;&gt;&gt; 计算列跟普通query里面的AS是反着的。
>&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午2:24写道:
>&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; flink 1.10.0:
>&gt;&gt;&gt;&gt;&gt; 在create table中,加PROCTIME() AS proctime字段报错
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt;&gt;&gt;&gt;&gt; Hi,
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>&gt;&gt;&gt;&gt;&gt;&gt; 可以参考下[1]
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; [1]
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午1:33写道:
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; explanation.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Actual binding is of type
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; configuration: logging only errors to the console.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Cannot generate a valid execution plan for the given query:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; fields=[time, sum_age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp; +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :- FlinkLogicalCalc(select=[uid, time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, user_behavior, source: [KafkaTableSource(uid,
>&gt;&gt;&gt;&gt;&gt; phoneType,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalSnapshot(period=[$cor0.time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalCalc(select=[uid, age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>&gt;&gt;&gt; sex,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; age, created_time)]]], fields=[uid, sex, age, created_time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>&gt;&gt;&gt; left
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; table's proctime field, doesn't support 'PROCTIME()'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Please check the documentation for the set of currently supported SQL
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; features.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt; scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Caused by: org.apache.flink.table.api.TableException: Temporal table
>&gt;&gt;&gt;&gt;&gt; join
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; field, doesn't support 'PROCTIME()'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt; org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; ... 20 more
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; query:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamExecutionEnv =
>&gt;&gt;&gt;&gt;&gt; StreamExecutionEnvironment.getExecutionEnvironment
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val blinkEnvSettings =
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamTableEnv =
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' =
>&gt;&gt;&gt;&gt;&gt; 'cdh1:2181,cdh2:2181,cdh3:2181',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' =
>&gt;&gt;&gt;&gt;&gt; 'cdh1:9092,cdh2:9092,cdh3:9092',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_cnt (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; sum_age INT
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.table' = 'user_cnt',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.username' = 'root',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.password' = '123456',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.write.flush.max-rows' = '1'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val userTableSource = new MysqlAsyncLookupTableSource(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array("uid", "sex", "age", "created_time"),
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(),
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.registerTableSource("users", userTableSource)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.execute("Temporal table join")
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;