You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/06/12 05:33:22 UTC

flink sql Temporal table join failed

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 




FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], fields=[time, sum_age])

+- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])

   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])

      :- FlinkLogicalCalc(select=[uid, time])

      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]]], fields=[uid, phoneType, clickCount, time])

      +- FlinkLogicalSnapshot(period=[$cor0.time])

         +- FlinkLogicalCalc(select=[uid, age])

            +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, created_time)]]], fields=[uid, sex, age, created_time])




Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

Please check the documentation for the set of currently supported SQL features.

at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)

at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)

at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)

at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)

Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)

at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)

at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)

at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)

at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)

at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)

at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)

at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)

at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)

at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)

at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)

at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)

at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)

at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)

... 20 more




query:


val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)

streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_behavior (
    |    uid VARCHAR,
    |    phoneType VARCHAR,
    |    clickCount INT,
    |    `time` TIMESTAMP(3)
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |    'connector.topic' = 'user_behavior',
    |    'connector.startup-mode' = 'earliest-offset',
    |    'connector.properties.0.key' = 'zookeeper.connect',
    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.1.key' = 'bootstrap.servers',
    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'update-mode' = 'append',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)
streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_cnt (
    |    `time` VARCHAR,
    |    sum_age INT
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
    |    'connector.table' = 'user_cnt',
    |    'connector.username' = 'root',
    |    'connector.password' = '123456',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)
val userTableSource = new MysqlAsyncLookupTableSource(
Array("uid", "sex", "age", "created_time"),
Array(),
Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
streamTableEnv.registerTableSource("users", userTableSource)
streamTableEnv.sqlUpdate(
"""
    |
    |insert into  user_cnt
    |SELECT
    |  cast(b.`time` as string), u.age
    |FROM
    |  user_behavior AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
    |  ON b.uid = u.uid
    |
    |""".stripMargin)
streamTableEnv.execute("Temporal table join")

Re: flink sql Temporal table join failed

Posted by 李奇 <35...@qq.com>.
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html 

> 在 2020年6月12日,下午2:24,Zhou Zach <wa...@163.com> 写道:
> 
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> Hi,
>> 
>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> 可以参考下[1]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> 
>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>> 
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> 
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> 
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>> 
>>> 
>>> 
>>> 
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>> 
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>> 
>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>> 
>>>      :- FlinkLogicalCalc(select=[uid, time])
>>> 
>>>      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>> 
>>>      +- FlinkLogicalSnapshot(period=[$cor0.time])
>>> 
>>>         +- FlinkLogicalCalc(select=[uid, age])
>>> 
>>>            +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>> 
>>> 
>>> 
>>> 
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>> 
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> 
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> 
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>> 
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>> 
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>> 
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>> 
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>> 
>>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>> 
>>> ... 20 more
>>> 
>>> 
>>> 
>>> 
>>> query:
>>> 
>>> 
>>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val blinkEnvSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val streamTableEnv =
>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>> 
>>> streamTableEnv.sqlUpdate(
>>> """
>>>    |
>>>    |CREATE TABLE user_behavior (
>>>    |    uid VARCHAR,
>>>    |    phoneType VARCHAR,
>>>    |    clickCount INT,
>>>    |    `time` TIMESTAMP(3)
>>>    |) WITH (
>>>    |    'connector.type' = 'kafka',
>>>    |    'connector.version' = 'universal',
>>>    |    'connector.topic' = 'user_behavior',
>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>    |    'update-mode' = 'append',
>>>    |    'format.type' = 'json',
>>>    |    'format.derive-schema' = 'true'
>>>    |)
>>>    |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>>    |
>>>    |CREATE TABLE user_cnt (
>>>    |    `time` VARCHAR,
>>>    |    sum_age INT
>>>    |) WITH (
>>>    |    'connector.type' = 'jdbc',
>>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>    |    'connector.table' = 'user_cnt',
>>>    |    'connector.username' = 'root',
>>>    |    'connector.password' = '123456',
>>>    |    'connector.write.flush.max-rows' = '1'
>>>    |)
>>>    |""".stripMargin)
>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>> Array("uid", "sex", "age", "created_time"),
>>> Array(),
>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>> streamTableEnv.registerTableSource("users", userTableSource)
>>> streamTableEnv.sqlUpdate(
>>> """
>>>    |
>>>    |insert into  user_cnt
>>>    |SELECT
>>>    |  cast(b.`time` as string), u.age
>>>    |FROM
>>>    |  user_behavior AS b
>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>    |  ON b.uid = u.uid
>>>    |
>>>    |""".stripMargin)
>>> streamTableEnv.execute("Temporal table join")


Re:Re: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.
感谢提醒

















在 2020-06-12 17:43:20,"Leonard Xu" <xb...@gmail.com> 写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach <wa...@163.com> 写道:
>> 
>> 
>> 
>> 
>> 是的,1.10.0版本
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>> 
>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>>> 
>>>> 还是不行,
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type
>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>> configuration: logging only errors to the console.
>>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>>> Was expecting one of:
>>>>    "CURSOR" ...
>>>>    "EXISTS" ...
>>>>    "NOT" ...
>>>>    "ROW" ...
>>>>    "(" ...
>>>>    "+" ...
>>>>    "-" ...
>>>>    <UNSIGNED_INTEGER_LITERAL> ...
>>>>    <DECIMAL_NUMERIC_LITERAL> ...
>>>>    <APPROX_NUMERIC_LITERAL> ...
>>>>    <BINARY_STRING_LITERAL> ...
>>>>    <PREFIXED_STRING_LITERAL> ...
>>>>    <QUOTED_STRING> ...
>>>>    <UNICODE_STRING_LITERAL> ...
>>>>    "TRUE" ...
>>>>    "FALSE" ...
>>>>    "UNKNOWN" ...
>>>>    "NULL" ...
>>>>    <LBRACE_D> ...
>>>>    <LBRACE_T> ...
>>>>    <LBRACE_TS> ...
>>>>    "DATE" ...
>>>>    "TIME" <QUOTED_STRING> ...
>>>>    "TIMESTAMP" ...
>>>>    "INTERVAL" ...
>>>>    "?" ...
>>>>    "CAST" ...
>>>>    "EXTRACT" ...
>>>>    "POSITION" ...
>>>>    "CONVERT" ...
>>>>    "TRANSLATE" ...
>>>>    "OVERLAY" ...
>>>>    "FLOOR" ...
>>>>    "CEIL" ...
>>>>    "CEILING" ...
>>>>    "SUBSTRING" ...
>>>>    "TRIM" ...
>>>>    "CLASSIFIER" ...
>>>>    "MATCH_NUMBER" ...
>>>>    "RUNNING" ...
>>>>    "PREV" ...
>>>>    "NEXT" ...
>>>>    "JSON_EXISTS" ...
>>>>    "JSON_VALUE" ...
>>>>    "JSON_QUERY" ...
>>>>    "JSON_OBJECT" ...
>>>>    "JSON_OBJECTAGG" ...
>>>>    "JSON_ARRAY" ...
>>>>    "JSON_ARRAYAGG" ...
>>>>    <LBRACE_FN> ...
>>>>    "MULTISET" ...
>>>>    "ARRAY" ...
>>>>    "MAP" ...
>>>>    "PERIOD" ...
>>>>    "SPECIFIC" ...
>>>>    <IDENTIFIER> ...
>>>>    <QUOTED_IDENTIFIER> ...
>>>>    <BACK_QUOTED_IDENTIFIER> ...
>>>>    <BRACKET_QUOTED_IDENTIFIER> ...
>>>>    <UNICODE_QUOTED_IDENTIFIER> ...
>>>>    "ABS" ...
>>>>    "AVG" ...
>>>>    "CARDINALITY" ...
>>>>    "CHAR_LENGTH" ...
>>>>    "CHARACTER_LENGTH" ...
>>>>    "COALESCE" ...
>>>>    "COLLECT" ...
>>>>    "COVAR_POP" ...
>>>>    "COVAR_SAMP" ...
>>>>    "CUME_DIST" ...
>>>>    "COUNT" ...
>>>>    "CURRENT_DATE" ...
>>>>    "CURRENT_TIME" ...
>>>>    "CURRENT_TIMESTAMP" ...
>>>>    "DENSE_RANK" ...
>>>>    "ELEMENT" ...
>>>>    "EXP" ...
>>>>    "FIRST_VALUE" ...
>>>>    "FUSION" ...
>>>>    "GROUPING" ...
>>>>    "HOUR" ...
>>>>    "LAG" ...
>>>>    "LEAD" ...
>>>>    "LEFT" ...
>>>>    "LAST_VALUE" ...
>>>>    "LN" ...
>>>>    "LOCALTIME" ...
>>>>    "LOCALTIMESTAMP" ...
>>>>    "LOWER" ...
>>>>    "MAX" ...
>>>>    "MIN" ...
>>>>    "MINUTE" ...
>>>>    "MOD" ...
>>>>    "MONTH" ...
>>>>    "NTH_VALUE" ...
>>>>    "NTILE" ...
>>>>    "NULLIF" ...
>>>>    "OCTET_LENGTH" ...
>>>>    "PERCENT_RANK" ...
>>>>    "POWER" ...
>>>>    "RANK" ...
>>>>    "REGR_COUNT" ...
>>>>    "REGR_SXX" ...
>>>>    "REGR_SYY" ...
>>>>    "RIGHT" ...
>>>>    "ROW_NUMBER" ...
>>>>    "SECOND" ...
>>>>    "SQRT" ...
>>>>    "STDDEV_POP" ...
>>>>    "STDDEV_SAMP" ...
>>>>    "SUM" ...
>>>>    "UPPER" ...
>>>>    "TRUNCATE" ...
>>>>    "USER" ...
>>>>    "VAR_POP" ...
>>>>    "VAR_SAMP" ...
>>>>    "YEAR" ...
>>>>    "CURRENT_CATALOG" ...
>>>>    "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>>    "CURRENT_PATH" ...
>>>>    "CURRENT_ROLE" ...
>>>>    "CURRENT_SCHEMA" ...
>>>>    "CURRENT_USER" ...
>>>>    "SESSION_USER" ...
>>>>    "SYSTEM_USER" ...
>>>>    "NEW" ...
>>>>    "CASE" ...
>>>>    "CURRENT" ...
>>>> 
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>>> at
>>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>> 
>>>> 
>>>> query:
>>>> 
>>>> 
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>>    |
>>>>    |CREATE TABLE user_behavior (
>>>>    |    uid VARCHAR,
>>>>    |    phoneType VARCHAR,
>>>>    |    clickCount INT,
>>>>    |    proctime AS PROCTIME(),
>>>>    |    `time` TIMESTAMP(3)
>>>>    |) WITH (
>>>>    |    'connector.type' = 'kafka',
>>>>    |    'connector.version' = 'universal',
>>>>    |    'connector.topic' = 'user_behavior',
>>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>>    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>>    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>    |    'update-mode' = 'append',
>>>>    |    'format.type' = 'json',
>>>>    |    'format.derive-schema' = 'true'
>>>>    |)
>>>>    |""".stripMargin)
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>>    |
>>>>    |insert into  user_cnt
>>>>    |SELECT
>>>>    |  cast(b.`time` as string), u.age
>>>>    |FROM
>>>>    |  user_behavior AS b
>>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>>>    |  ON b.uid = u.uid
>>>>    |
>>>>    |""".stripMargin)
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>>> 放在select 后面也不行。
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>>>>> 你写反了,是proctime AS PROCTIME()。
>>>>> 计算列跟普通query里面的AS是反着的。
>>>>> 
>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>> 
>>>>>> flink 1.10.0:
>>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>>> 可以参考下[1]
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>> 
>>>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>> 
>>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>> 
>>>>>>>> SLF4J: Found binding in
>>>>>>>> 
>>>>>> 
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>> 
>>>>>>>> SLF4J: Found binding in
>>>>>>>> 
>>>>>> 
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>> 
>>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>>> explanation.
>>>>>>>> 
>>>>>>>> SLF4J: Actual binding is of type
>>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>> 
>>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>>> configuration: logging only errors to the console.
>>>>>>>> 
>>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>>> fields=[time, sum_age])
>>>>>>>> 
>>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>> 
>>>>>>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>> 
>>>>>>>>      :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>> 
>>>>>>>>      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>>> phoneType,
>>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>> 
>>>>>>>>      +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>> 
>>>>>>>>         +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>> 
>>>>>>>>            +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>>> sex,
>>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>>> left
>>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>> 
>>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>>> features.
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>> 
>>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>> 
>>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>> 
>>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>> 
>>>>>>>> at
>>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>> 
>>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>>> join
>>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>> 
>>>>>>>> at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>> 
>>>>>>>> at
>>>>>>>> 
>>>>>> 
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>> 
>>>>>>>> ... 20 more
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> query:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> val streamExecutionEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkEnvSettings =
>>>>>>>> 
>>>>>> 
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> val streamTableEnv =
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>> 
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |CREATE TABLE user_behavior (
>>>>>>>>    |    uid VARCHAR,
>>>>>>>>    |    phoneType VARCHAR,
>>>>>>>>    |    clickCount INT,
>>>>>>>>    |    `time` TIMESTAMP(3)
>>>>>>>>    |) WITH (
>>>>>>>>    |    'connector.type' = 'kafka',
>>>>>>>>    |    'connector.version' = 'universal',
>>>>>>>>    |    'connector.topic' = 'user_behavior',
>>>>>>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>>>>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>>    |    'connector.properties.0.value' =
>>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>>    |    'connector.properties.1.value' =
>>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>>>    |    'update-mode' = 'append',
>>>>>>>>    |    'format.type' = 'json',
>>>>>>>>    |    'format.derive-schema' = 'true'
>>>>>>>>    |)
>>>>>>>>    |""".stripMargin)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |CREATE TABLE user_cnt (
>>>>>>>>    |    `time` VARCHAR,
>>>>>>>>    |    sum_age INT
>>>>>>>>    |) WITH (
>>>>>>>>    |    'connector.type' = 'jdbc',
>>>>>>>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>>>    |    'connector.table' = 'user_cnt',
>>>>>>>>    |    'connector.username' = 'root',
>>>>>>>>    |    'connector.password' = '123456',
>>>>>>>>    |    'connector.write.flush.max-rows' = '1'
>>>>>>>>    |)
>>>>>>>>    |""".stripMargin)
>>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>>> Array(),
>>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>>    |
>>>>>>>>    |insert into  user_cnt
>>>>>>>>    |SELECT
>>>>>>>>    |  cast(b.`time` as string), u.age
>>>>>>>>    |FROM
>>>>>>>>    |  user_behavior AS b
>>>>>>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>>>    |  ON b.uid = u.uid
>>>>>>>>    |
>>>>>>>>    |""".stripMargin)
>>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>> 
>>>> 

Re:回复: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.
好的

















在 2020-06-12 17:46:22,"咖啡泡油条" <93...@qq.com> 写道:
>可以参考之前的邮件列表
>https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"Leonard Xu"<xbjtdcq@gmail.com&gt;;
>发送时间:&nbsp;2020年6月12日(星期五) 下午5:43
>收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
>主题:&nbsp;Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>&gt; 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com&gt; 写道:
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 是的,1.10.0版本
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 
>&gt; 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt; 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>&gt;&gt; 
>&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午3:47写道:
>&gt;&gt; 
>&gt;&gt;&gt; 还是不行,
>&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
>&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>&gt;&gt;&gt; explanation.
>&gt;&gt;&gt; SLF4J: Actual binding is of type
>&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
>&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
>&gt;&gt;&gt; configuration: logging only errors to the console.
>&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>&gt;&gt;&gt; SQL parse failed. Encountered "time FROM" at line 1, column 44.
>&gt;&gt;&gt; Was expecting one of:
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURSOR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXISTS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NOT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "(" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "+" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "-" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNSIGNED_INTEGER_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <DECIMAL_NUMERIC_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <APPROX_NUMERIC_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BINARY_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <PREFIXED_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_STRING&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_STRING_LITERAL&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FALSE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UNKNOWN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_D&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_T&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_TS&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIME" <QUOTED_STRING&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "INTERVAL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "?" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CAST" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXTRACT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POSITION" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CONVERT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRANSLATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OVERLAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FLOOR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEIL" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEILING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUBSTRING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRIM" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CLASSIFIER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MATCH_NUMBER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RUNNING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PREV" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEXT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_EXISTS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_QUERY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECTAGG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAYAGG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_FN&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MULTISET" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ARRAY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERIOD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SPECIFIC" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BACK_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BRACKET_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_QUOTED_IDENTIFIER&gt; ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ABS" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "AVG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CARDINALITY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHAR_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHARACTER_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COALESCE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COLLECT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CUME_DIST" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COUNT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIME" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DENSE_RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ELEMENT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FIRST_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FUSION" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "GROUPING" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "HOUR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEAD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEFT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAST_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIME" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIMESTAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOWER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAX" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MIN" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MINUTE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MOD" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MONTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTH_VALUE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTILE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULLIF" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OCTET_LENGTH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERCENT_RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POWER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RANK" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_COUNT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SXX" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SYY" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RIGHT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW_NUMBER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SECOND" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SQRT" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUM" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UPPER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUNCATE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_POP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_SAMP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "YEAR" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_CATALOG" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_PATH" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_ROLE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_SCHEMA" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SESSION_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SYSTEM_USER" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEW" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CASE" ...
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT" ...
>&gt;&gt;&gt; 
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>&gt;&gt;&gt; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>&gt;&gt;&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>&gt;&gt;&gt; at
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; query:
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt; """
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; proctime AS PROCTIME(),
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt; """
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>&gt;&gt;&gt; 放在select 后面也不行。
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 
>&gt;&gt;&gt; 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt;&gt;&gt; 你写反了,是proctime AS PROCTIME()。
>&gt;&gt;&gt;&gt; 计算列跟普通query里面的AS是反着的。
>&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午2:24写道:
>&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; flink 1.10.0:
>&gt;&gt;&gt;&gt;&gt; 在create table中,加PROCTIME() AS proctime字段报错
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org&gt; 写道:
>&gt;&gt;&gt;&gt;&gt;&gt; Hi,
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>&gt;&gt;&gt;&gt;&gt;&gt; 可以参考下[1]
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; [1]
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午1:33写道:
>&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; explanation.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Actual binding is of type
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; configuration: logging only errors to the console.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Cannot generate a valid execution plan for the given query:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; fields=[time, sum_age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp; +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :- FlinkLogicalCalc(select=[uid, time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, user_behavior, source: [KafkaTableSource(uid,
>&gt;&gt;&gt;&gt;&gt; phoneType,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalSnapshot(period=[$cor0.time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalCalc(select=[uid, age])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>&gt;&gt;&gt; sex,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; age, created_time)]]], fields=[uid, sex, age, created_time])
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>&gt;&gt;&gt; left
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; table's proctime field, doesn't support 'PROCTIME()'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Please check the documentation for the set of currently supported SQL
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; features.
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt; scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Caused by: org.apache.flink.table.api.TableException: Temporal table
>&gt;&gt;&gt;&gt;&gt; join
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; field, doesn't support 'PROCTIME()'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt; org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; ... 20 more
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; query:
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamExecutionEnv =
>&gt;&gt;&gt;&gt;&gt; StreamExecutionEnvironment.getExecutionEnvironment
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val blinkEnvSettings =
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamTableEnv =
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' =
>&gt;&gt;&gt;&gt;&gt; 'cdh1:2181,cdh2:2181,cdh3:2181',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' =
>&gt;&gt;&gt;&gt;&gt; 'cdh1:9092,cdh2:9092,cdh3:9092',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_cnt (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` VARCHAR,
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; sum_age INT
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.table' = 'user_cnt',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.username' = 'root',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.password' = '123456',
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.write.flush.max-rows' = '1'
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; val userTableSource = new MysqlAsyncLookupTableSource(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array("uid", "sex", "age", "created_time"),
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(),
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.registerTableSource("users", userTableSource)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
>&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
>&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.execute("Temporal table join")
>&gt;&gt;&gt;&gt;&gt; 
>&gt;&gt;&gt;

回复: flink sql Temporal table join failed

Posted by 咖啡泡油条 <93...@qq.com>.
可以参考之前的邮件列表
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年6月12日(星期五) 下午5:43
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink sql Temporal table join failed




你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。

祝好
Leonard Xu

&gt; 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com&gt; 写道:
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 是的,1.10.0版本
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt; 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
&gt;&gt; 
&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午3:47写道:
&gt;&gt; 
&gt;&gt;&gt; 还是不行,
&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt; explanation.
&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.SqlParserException:
&gt;&gt;&gt; SQL parse failed. Encountered "time FROM" at line 1, column 44.
&gt;&gt;&gt; Was expecting one of:
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURSOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NOT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "(" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "+" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "-" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNSIGNED_INTEGER_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <DECIMAL_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <APPROX_NUMERIC_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BINARY_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <PREFIXED_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_STRING_LITERAL&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FALSE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UNKNOWN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_D&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_T&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_TS&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIME" <QUOTED_STRING&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "INTERVAL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "?" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CAST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXTRACT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POSITION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CONVERT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRANSLATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OVERLAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FLOOR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEIL" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CEILING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUBSTRING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRIM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CLASSIFIER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MATCH_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RUNNING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PREV" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEXT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_EXISTS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_QUERY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_OBJECTAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "JSON_ARRAYAGG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <LBRACE_FN&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MULTISET" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ARRAY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERIOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SPECIFIC" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BACK_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <BRACKET_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; <UNICODE_QUOTED_IDENTIFIER&gt; ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ABS" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "AVG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CARDINALITY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHAR_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CHARACTER_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COALESCE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COLLECT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COVAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CUME_DIST" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_TIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "DENSE_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ELEMENT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "EXP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FIRST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "FUSION" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "GROUPING" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "HOUR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEAD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LEFT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LAST_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIME" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOCALTIMESTAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "LOWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MAX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MIN" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MINUTE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MOD" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "MONTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTH_VALUE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NTILE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NULLIF" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "OCTET_LENGTH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "PERCENT_RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "POWER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RANK" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_COUNT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SXX" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "REGR_SYY" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "RIGHT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "ROW_NUMBER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SECOND" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SQRT" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "STDDEV_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SUM" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "UPPER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "TRUNCATE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_POP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "VAR_SAMP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "YEAR" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_CATALOG" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_PATH" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_ROLE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_SCHEMA" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SESSION_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "SYSTEM_USER" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "NEW" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CASE" ...
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; "CURRENT" ...
&gt;&gt;&gt; 
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
&gt;&gt;&gt; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
&gt;&gt;&gt; $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
&gt;&gt;&gt; at
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; query:
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; proctime AS PROCTIME(),
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
&gt;&gt;&gt; 放在select 后面也不行。
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt;&gt;&gt; 你写反了,是proctime AS PROCTIME()。
&gt;&gt;&gt;&gt; 计算列跟普通query里面的AS是反着的。
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午2:24写道:
&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; flink 1.10.0:
&gt;&gt;&gt;&gt;&gt; 在create table中,加PROCTIME() AS proctime字段报错
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org&gt; 写道:
&gt;&gt;&gt;&gt;&gt;&gt; Hi,
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
&gt;&gt;&gt;&gt;&gt;&gt; 可以参考下[1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; [1]
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt; Zhou Zach <wander669@163.com&gt; 于2020年6月12日周五 下午1:33写道:
&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Class path contains multiple SLF4J bindings.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Found binding in
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
&gt;&gt;&gt;&gt;&gt;&gt;&gt; explanation.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; SLF4J: Actual binding is of type
&gt;&gt;&gt;&gt;&gt;&gt;&gt; [org.apache.logging.slf4j.Log4jLoggerFactory]
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ERROR StatusLogger No log4j2 configuration file found. Using default
&gt;&gt;&gt;&gt;&gt;&gt;&gt; configuration: logging only errors to the console.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Cannot generate a valid execution plan for the given query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
&gt;&gt;&gt;&gt;&gt;&gt;&gt; fields=[time, sum_age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp; +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :- FlinkLogicalCalc(select=[uid, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; :&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, user_behavior, source: [KafkaTableSource(uid,
&gt;&gt;&gt;&gt;&gt; phoneType,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalSnapshot(period=[$cor0.time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalCalc(select=[uid, age])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; +- FlinkLogicalTableSourceScan(table=[[default_catalog,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; default_database, users, source: [MysqlAsyncLookupTableSource(uid,
&gt;&gt;&gt; sex,
&gt;&gt;&gt;&gt;&gt;&gt;&gt; age, created_time)]]], fields=[uid, sex, age, created_time])
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
&gt;&gt;&gt; left
&gt;&gt;&gt;&gt;&gt;&gt;&gt; table's proctime field, doesn't support 'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Please check the documentation for the set of currently supported SQL
&gt;&gt;&gt;&gt;&gt;&gt;&gt; features.
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt; scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Caused by: org.apache.flink.table.api.TableException: Temporal table
&gt;&gt;&gt;&gt;&gt; join
&gt;&gt;&gt;&gt;&gt;&gt;&gt; currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
&gt;&gt;&gt;&gt;&gt;&gt;&gt; field, doesn't support 'PROCTIME()'
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt; org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; at
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; ... 20 more
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; query:
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamExecutionEnv =
&gt;&gt;&gt;&gt;&gt; StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val blinkEnvSettings =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val streamTableEnv =
&gt;&gt;&gt;&gt;&gt;&gt;&gt; StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_behavior (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; uid VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; phoneType VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; clickCount INT,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` TIMESTAMP(3)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_behavior',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'zookeeper.connect',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:2181,cdh2:2181,cdh3:2181',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'bootstrap.servers',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' =
&gt;&gt;&gt;&gt;&gt; 'cdh1:9092,cdh2:9092,cdh3:9092',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |CREATE TABLE user_cnt (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; `time` VARCHAR,
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; sum_age INT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |) WITH (
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.table' = 'user_cnt',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.username' = 'root',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.password' = '123456',
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp;&nbsp;&nbsp; 'connector.write.flush.max-rows' = '1'
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |)
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; val userTableSource = new MysqlAsyncLookupTableSource(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array("uid", "sex", "age", "created_time"),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(),
&gt;&gt;&gt;&gt;&gt;&gt;&gt; Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.registerTableSource("users", userTableSource)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.sqlUpdate(
&gt;&gt;&gt;&gt;&gt;&gt;&gt; """
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_cnt
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |SELECT
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; cast(b.`time` as string), u.age
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |FROM
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; user_behavior AS b
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |&nbsp; ON b.uid = u.uid
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |
&gt;&gt;&gt;&gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; |""".stripMargin)
&gt;&gt;&gt;&gt;&gt;&gt;&gt; streamTableEnv.execute("Temporal table join")
&gt;&gt;&gt;&gt;&gt; 
&gt;&gt;&gt;

Re: flink sql Temporal table join failed

Posted by Leonard Xu <xb...@gmail.com>.
你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。

祝好
Leonard Xu

> 在 2020年6月12日,17:38,Zhou Zach <wa...@163.com> 写道:
> 
> 
> 
> 
> 是的,1.10.0版本
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>> 
>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>> 
>>> 还是不行,
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>> Was expecting one of:
>>>    "CURSOR" ...
>>>    "EXISTS" ...
>>>    "NOT" ...
>>>    "ROW" ...
>>>    "(" ...
>>>    "+" ...
>>>    "-" ...
>>>    <UNSIGNED_INTEGER_LITERAL> ...
>>>    <DECIMAL_NUMERIC_LITERAL> ...
>>>    <APPROX_NUMERIC_LITERAL> ...
>>>    <BINARY_STRING_LITERAL> ...
>>>    <PREFIXED_STRING_LITERAL> ...
>>>    <QUOTED_STRING> ...
>>>    <UNICODE_STRING_LITERAL> ...
>>>    "TRUE" ...
>>>    "FALSE" ...
>>>    "UNKNOWN" ...
>>>    "NULL" ...
>>>    <LBRACE_D> ...
>>>    <LBRACE_T> ...
>>>    <LBRACE_TS> ...
>>>    "DATE" ...
>>>    "TIME" <QUOTED_STRING> ...
>>>    "TIMESTAMP" ...
>>>    "INTERVAL" ...
>>>    "?" ...
>>>    "CAST" ...
>>>    "EXTRACT" ...
>>>    "POSITION" ...
>>>    "CONVERT" ...
>>>    "TRANSLATE" ...
>>>    "OVERLAY" ...
>>>    "FLOOR" ...
>>>    "CEIL" ...
>>>    "CEILING" ...
>>>    "SUBSTRING" ...
>>>    "TRIM" ...
>>>    "CLASSIFIER" ...
>>>    "MATCH_NUMBER" ...
>>>    "RUNNING" ...
>>>    "PREV" ...
>>>    "NEXT" ...
>>>    "JSON_EXISTS" ...
>>>    "JSON_VALUE" ...
>>>    "JSON_QUERY" ...
>>>    "JSON_OBJECT" ...
>>>    "JSON_OBJECTAGG" ...
>>>    "JSON_ARRAY" ...
>>>    "JSON_ARRAYAGG" ...
>>>    <LBRACE_FN> ...
>>>    "MULTISET" ...
>>>    "ARRAY" ...
>>>    "MAP" ...
>>>    "PERIOD" ...
>>>    "SPECIFIC" ...
>>>    <IDENTIFIER> ...
>>>    <QUOTED_IDENTIFIER> ...
>>>    <BACK_QUOTED_IDENTIFIER> ...
>>>    <BRACKET_QUOTED_IDENTIFIER> ...
>>>    <UNICODE_QUOTED_IDENTIFIER> ...
>>>    "ABS" ...
>>>    "AVG" ...
>>>    "CARDINALITY" ...
>>>    "CHAR_LENGTH" ...
>>>    "CHARACTER_LENGTH" ...
>>>    "COALESCE" ...
>>>    "COLLECT" ...
>>>    "COVAR_POP" ...
>>>    "COVAR_SAMP" ...
>>>    "CUME_DIST" ...
>>>    "COUNT" ...
>>>    "CURRENT_DATE" ...
>>>    "CURRENT_TIME" ...
>>>    "CURRENT_TIMESTAMP" ...
>>>    "DENSE_RANK" ...
>>>    "ELEMENT" ...
>>>    "EXP" ...
>>>    "FIRST_VALUE" ...
>>>    "FUSION" ...
>>>    "GROUPING" ...
>>>    "HOUR" ...
>>>    "LAG" ...
>>>    "LEAD" ...
>>>    "LEFT" ...
>>>    "LAST_VALUE" ...
>>>    "LN" ...
>>>    "LOCALTIME" ...
>>>    "LOCALTIMESTAMP" ...
>>>    "LOWER" ...
>>>    "MAX" ...
>>>    "MIN" ...
>>>    "MINUTE" ...
>>>    "MOD" ...
>>>    "MONTH" ...
>>>    "NTH_VALUE" ...
>>>    "NTILE" ...
>>>    "NULLIF" ...
>>>    "OCTET_LENGTH" ...
>>>    "PERCENT_RANK" ...
>>>    "POWER" ...
>>>    "RANK" ...
>>>    "REGR_COUNT" ...
>>>    "REGR_SXX" ...
>>>    "REGR_SYY" ...
>>>    "RIGHT" ...
>>>    "ROW_NUMBER" ...
>>>    "SECOND" ...
>>>    "SQRT" ...
>>>    "STDDEV_POP" ...
>>>    "STDDEV_SAMP" ...
>>>    "SUM" ...
>>>    "UPPER" ...
>>>    "TRUNCATE" ...
>>>    "USER" ...
>>>    "VAR_POP" ...
>>>    "VAR_SAMP" ...
>>>    "YEAR" ...
>>>    "CURRENT_CATALOG" ...
>>>    "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>    "CURRENT_PATH" ...
>>>    "CURRENT_ROLE" ...
>>>    "CURRENT_SCHEMA" ...
>>>    "CURRENT_USER" ...
>>>    "SESSION_USER" ...
>>>    "SYSTEM_USER" ...
>>>    "NEW" ...
>>>    "CASE" ...
>>>    "CURRENT" ...
>>> 
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>> at
>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>> 
>>> 
>>> query:
>>> 
>>> 
>>> streamTableEnv.sqlUpdate(
>>> """
>>>    |
>>>    |CREATE TABLE user_behavior (
>>>    |    uid VARCHAR,
>>>    |    phoneType VARCHAR,
>>>    |    clickCount INT,
>>>    |    proctime AS PROCTIME(),
>>>    |    `time` TIMESTAMP(3)
>>>    |) WITH (
>>>    |    'connector.type' = 'kafka',
>>>    |    'connector.version' = 'universal',
>>>    |    'connector.topic' = 'user_behavior',
>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>    |    'update-mode' = 'append',
>>>    |    'format.type' = 'json',
>>>    |    'format.derive-schema' = 'true'
>>>    |)
>>>    |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>>    |
>>>    |insert into  user_cnt
>>>    |SELECT
>>>    |  cast(b.`time` as string), u.age
>>>    |FROM
>>>    |  user_behavior AS b
>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>>    |  ON b.uid = u.uid
>>>    |
>>>    |""".stripMargin)
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>> 放在select 后面也不行。
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>>>> 你写反了,是proctime AS PROCTIME()。
>>>> 计算列跟普通query里面的AS是反着的。
>>>> 
>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>>>> 
>>>>> flink 1.10.0:
>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>>>>> Hi,
>>>>>> 
>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>> 可以参考下[1]
>>>>>> 
>>>>>> [1]
>>>>>> 
>>>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>> 
>>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>> 
>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>> 
>>>>>>> SLF4J: Found binding in
>>>>>>> 
>>>>> 
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>> 
>>>>>>> SLF4J: Found binding in
>>>>>>> 
>>>>> 
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>> 
>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>> explanation.
>>>>>>> 
>>>>>>> SLF4J: Actual binding is of type
>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>> 
>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>> configuration: logging only errors to the console.
>>>>>>> 
>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>> fields=[time, sum_age])
>>>>>>> 
>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>> 
>>>>>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>> 
>>>>>>>      :- FlinkLogicalCalc(select=[uid, time])
>>>>>>> 
>>>>>>>      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>> phoneType,
>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>> 
>>>>>>>      +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>> 
>>>>>>>         +- FlinkLogicalCalc(select=[uid, age])
>>>>>>> 
>>>>>>>            +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>> sex,
>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>> left
>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>> 
>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>> features.
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>> 
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>> 
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>> 
>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>> 
>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>> 
>>>>>>> at
>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>> 
>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>> join
>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>> 
>>>>>>> at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>> 
>>>>>>> at
>>>>>>> 
>>>>> 
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>> 
>>>>>>> ... 20 more
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> query:
>>>>>>> 
>>>>>>> 
>>>>>>> val streamExecutionEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val blinkEnvSettings =
>>>>>>> 
>>>>> 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>> val streamTableEnv =
>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>> 
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>>    |
>>>>>>>    |CREATE TABLE user_behavior (
>>>>>>>    |    uid VARCHAR,
>>>>>>>    |    phoneType VARCHAR,
>>>>>>>    |    clickCount INT,
>>>>>>>    |    `time` TIMESTAMP(3)
>>>>>>>    |) WITH (
>>>>>>>    |    'connector.type' = 'kafka',
>>>>>>>    |    'connector.version' = 'universal',
>>>>>>>    |    'connector.topic' = 'user_behavior',
>>>>>>>    |    'connector.startup-mode' = 'earliest-offset',
>>>>>>>    |    'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>    |    'connector.properties.0.value' =
>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>>    |    'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>    |    'connector.properties.1.value' =
>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>>    |    'update-mode' = 'append',
>>>>>>>    |    'format.type' = 'json',
>>>>>>>    |    'format.derive-schema' = 'true'
>>>>>>>    |)
>>>>>>>    |""".stripMargin)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>>    |
>>>>>>>    |CREATE TABLE user_cnt (
>>>>>>>    |    `time` VARCHAR,
>>>>>>>    |    sum_age INT
>>>>>>>    |) WITH (
>>>>>>>    |    'connector.type' = 'jdbc',
>>>>>>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>>    |    'connector.table' = 'user_cnt',
>>>>>>>    |    'connector.username' = 'root',
>>>>>>>    |    'connector.password' = '123456',
>>>>>>>    |    'connector.write.flush.max-rows' = '1'
>>>>>>>    |)
>>>>>>>    |""".stripMargin)
>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>> Array(),
>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>>    |
>>>>>>>    |insert into  user_cnt
>>>>>>>    |SELECT
>>>>>>>    |  cast(b.`time` as string), u.age
>>>>>>>    |FROM
>>>>>>>    |  user_behavior AS b
>>>>>>>    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>>    |  ON b.uid = u.uid
>>>>>>>    |
>>>>>>>    |""".stripMargin)
>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>> 
>>> 


Re:Re: Re: Re: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.


是的,1.10.0版本








在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>
>> 还是不行,
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>> Was expecting one of:
>>     "CURSOR" ...
>>     "EXISTS" ...
>>     "NOT" ...
>>     "ROW" ...
>>     "(" ...
>>     "+" ...
>>     "-" ...
>>     <UNSIGNED_INTEGER_LITERAL> ...
>>     <DECIMAL_NUMERIC_LITERAL> ...
>>     <APPROX_NUMERIC_LITERAL> ...
>>     <BINARY_STRING_LITERAL> ...
>>     <PREFIXED_STRING_LITERAL> ...
>>     <QUOTED_STRING> ...
>>     <UNICODE_STRING_LITERAL> ...
>>     "TRUE" ...
>>     "FALSE" ...
>>     "UNKNOWN" ...
>>     "NULL" ...
>>     <LBRACE_D> ...
>>     <LBRACE_T> ...
>>     <LBRACE_TS> ...
>>     "DATE" ...
>>     "TIME" <QUOTED_STRING> ...
>>     "TIMESTAMP" ...
>>     "INTERVAL" ...
>>     "?" ...
>>     "CAST" ...
>>     "EXTRACT" ...
>>     "POSITION" ...
>>     "CONVERT" ...
>>     "TRANSLATE" ...
>>     "OVERLAY" ...
>>     "FLOOR" ...
>>     "CEIL" ...
>>     "CEILING" ...
>>     "SUBSTRING" ...
>>     "TRIM" ...
>>     "CLASSIFIER" ...
>>     "MATCH_NUMBER" ...
>>     "RUNNING" ...
>>     "PREV" ...
>>     "NEXT" ...
>>     "JSON_EXISTS" ...
>>     "JSON_VALUE" ...
>>     "JSON_QUERY" ...
>>     "JSON_OBJECT" ...
>>     "JSON_OBJECTAGG" ...
>>     "JSON_ARRAY" ...
>>     "JSON_ARRAYAGG" ...
>>     <LBRACE_FN> ...
>>     "MULTISET" ...
>>     "ARRAY" ...
>>     "MAP" ...
>>     "PERIOD" ...
>>     "SPECIFIC" ...
>>     <IDENTIFIER> ...
>>     <QUOTED_IDENTIFIER> ...
>>     <BACK_QUOTED_IDENTIFIER> ...
>>     <BRACKET_QUOTED_IDENTIFIER> ...
>>     <UNICODE_QUOTED_IDENTIFIER> ...
>>     "ABS" ...
>>     "AVG" ...
>>     "CARDINALITY" ...
>>     "CHAR_LENGTH" ...
>>     "CHARACTER_LENGTH" ...
>>     "COALESCE" ...
>>     "COLLECT" ...
>>     "COVAR_POP" ...
>>     "COVAR_SAMP" ...
>>     "CUME_DIST" ...
>>     "COUNT" ...
>>     "CURRENT_DATE" ...
>>     "CURRENT_TIME" ...
>>     "CURRENT_TIMESTAMP" ...
>>     "DENSE_RANK" ...
>>     "ELEMENT" ...
>>     "EXP" ...
>>     "FIRST_VALUE" ...
>>     "FUSION" ...
>>     "GROUPING" ...
>>     "HOUR" ...
>>     "LAG" ...
>>     "LEAD" ...
>>     "LEFT" ...
>>     "LAST_VALUE" ...
>>     "LN" ...
>>     "LOCALTIME" ...
>>     "LOCALTIMESTAMP" ...
>>     "LOWER" ...
>>     "MAX" ...
>>     "MIN" ...
>>     "MINUTE" ...
>>     "MOD" ...
>>     "MONTH" ...
>>     "NTH_VALUE" ...
>>     "NTILE" ...
>>     "NULLIF" ...
>>     "OCTET_LENGTH" ...
>>     "PERCENT_RANK" ...
>>     "POWER" ...
>>     "RANK" ...
>>     "REGR_COUNT" ...
>>     "REGR_SXX" ...
>>     "REGR_SYY" ...
>>     "RIGHT" ...
>>     "ROW_NUMBER" ...
>>     "SECOND" ...
>>     "SQRT" ...
>>     "STDDEV_POP" ...
>>     "STDDEV_SAMP" ...
>>     "SUM" ...
>>     "UPPER" ...
>>     "TRUNCATE" ...
>>     "USER" ...
>>     "VAR_POP" ...
>>     "VAR_SAMP" ...
>>     "YEAR" ...
>>     "CURRENT_CATALOG" ...
>>     "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>     "CURRENT_PATH" ...
>>     "CURRENT_ROLE" ...
>>     "CURRENT_SCHEMA" ...
>>     "CURRENT_USER" ...
>>     "SESSION_USER" ...
>>     "SYSTEM_USER" ...
>>     "NEW" ...
>>     "CASE" ...
>>     "CURRENT" ...
>>
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>> at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>>
>> query:
>>
>>
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |CREATE TABLE user_behavior (
>>     |    uid VARCHAR,
>>     |    phoneType VARCHAR,
>>     |    clickCount INT,
>>     |    proctime AS PROCTIME(),
>>     |    `time` TIMESTAMP(3)
>>     |) WITH (
>>     |    'connector.type' = 'kafka',
>>     |    'connector.version' = 'universal',
>>     |    'connector.topic' = 'user_behavior',
>>     |    'connector.startup-mode' = 'earliest-offset',
>>     |    'connector.properties.0.key' = 'zookeeper.connect',
>>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>     |    'connector.properties.1.key' = 'bootstrap.servers',
>>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>     |    'update-mode' = 'append',
>>     |    'format.type' = 'json',
>>     |    'format.derive-schema' = 'true'
>>     |)
>>     |""".stripMargin)
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |insert into  user_cnt
>>     |SELECT
>>     |  cast(b.`time` as string), u.age
>>     |FROM
>>     |  user_behavior AS b
>>     |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>     |  ON b.uid = u.uid
>>     |
>>     |""".stripMargin)
>>
>>
>>
>>
>>
>>
>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>> 放在select 后面也不行。
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>> >你写反了,是proctime AS PROCTIME()。
>> >计算列跟普通query里面的AS是反着的。
>> >
>> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>> >
>> >> flink 1.10.0:
>> >> 在create table中,加PROCTIME() AS proctime字段报错
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> >> >Hi,
>> >> >
>> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> >> >可以参考下[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> >> >
>> >> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>> >> >
>> >> >> SLF4J: Class path contains multiple SLF4J bindings.
>> >> >>
>> >> >> SLF4J: Found binding in
>> >> >>
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >> >>
>> >> >> SLF4J: Found binding in
>> >> >>
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >> >>
>> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> >> >> explanation.
>> >> >>
>> >> >> SLF4J: Actual binding is of type
>> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> >> >>
>> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default
>> >> >> configuration: logging only errors to the console.
>> >> >>
>> >> >> Exception in thread "main" org.apache.flink.table.api.TableException:
>> >> >> Cannot generate a valid execution plan for the given query:
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> >> >> fields=[time, sum_age])
>> >> >>
>> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>> >> >>
>> >> >>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>> >> >>
>> >> >>       :- FlinkLogicalCalc(select=[uid, time])
>> >> >>
>> >> >>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> >> default_database, user_behavior, source: [KafkaTableSource(uid,
>> >> phoneType,
>> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>> >> >>
>> >> >>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>> >> >>
>> >> >>          +- FlinkLogicalCalc(select=[uid, age])
>> >> >>
>> >> >>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>> sex,
>> >> >> age, created_time)]]], fields=[uid, sex, age, created_time])
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>> left
>> >> >> table's proctime field, doesn't support 'PROCTIME()'
>> >> >>
>> >> >> Please check the documentation for the set of currently supported SQL
>> >> >> features.
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >> >>
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >>
>> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >>
>> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >>
>> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>> >> >>
>> >> >> at
>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>> >> >>
>> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
>> >> join
>> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> >> >> field, doesn't support 'PROCTIME()'
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>> >> >>
>> >> >> at
>> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>> >> >>
>> >> >> ... 20 more
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> query:
>> >> >>
>> >> >>
>> >> >> val streamExecutionEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment
>> >> >> val blinkEnvSettings =
>> >> >>
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >> >> val streamTableEnv =
>> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>> >> >>
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >>     |
>> >> >>     |CREATE TABLE user_behavior (
>> >> >>     |    uid VARCHAR,
>> >> >>     |    phoneType VARCHAR,
>> >> >>     |    clickCount INT,
>> >> >>     |    `time` TIMESTAMP(3)
>> >> >>     |) WITH (
>> >> >>     |    'connector.type' = 'kafka',
>> >> >>     |    'connector.version' = 'universal',
>> >> >>     |    'connector.topic' = 'user_behavior',
>> >> >>     |    'connector.startup-mode' = 'earliest-offset',
>> >> >>     |    'connector.properties.0.key' = 'zookeeper.connect',
>> >> >>     |    'connector.properties.0.value' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >>     |    'connector.properties.1.key' = 'bootstrap.servers',
>> >> >>     |    'connector.properties.1.value' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >>     |    'update-mode' = 'append',
>> >> >>     |    'format.type' = 'json',
>> >> >>     |    'format.derive-schema' = 'true'
>> >> >>     |)
>> >> >>     |""".stripMargin)
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >>     |
>> >> >>     |CREATE TABLE user_cnt (
>> >> >>     |    `time` VARCHAR,
>> >> >>     |    sum_age INT
>> >> >>     |) WITH (
>> >> >>     |    'connector.type' = 'jdbc',
>> >> >>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> >> >>     |    'connector.table' = 'user_cnt',
>> >> >>     |    'connector.username' = 'root',
>> >> >>     |    'connector.password' = '123456',
>> >> >>     |    'connector.write.flush.max-rows' = '1'
>> >> >>     |)
>> >> >>     |""".stripMargin)
>> >> >> val userTableSource = new MysqlAsyncLookupTableSource(
>> >> >> Array("uid", "sex", "age", "created_time"),
>> >> >> Array(),
>> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> >> >> streamTableEnv.registerTableSource("users", userTableSource)
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >>     |
>> >> >>     |insert into  user_cnt
>> >> >>     |SELECT
>> >> >>     |  cast(b.`time` as string), u.age
>> >> >>     |FROM
>> >> >>     |  user_behavior AS b
>> >> >>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> >> >>     |  ON b.uid = u.uid
>> >> >>     |
>> >> >>     |""".stripMargin)
>> >> >> streamTableEnv.execute("Temporal table join")
>> >>
>>

Re: Re: Re: flink sql Temporal table join failed

Posted by Benchao Li <li...@apache.org>.
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。

Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:

> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
>     "CURSOR" ...
>     "EXISTS" ...
>     "NOT" ...
>     "ROW" ...
>     "(" ...
>     "+" ...
>     "-" ...
>     <UNSIGNED_INTEGER_LITERAL> ...
>     <DECIMAL_NUMERIC_LITERAL> ...
>     <APPROX_NUMERIC_LITERAL> ...
>     <BINARY_STRING_LITERAL> ...
>     <PREFIXED_STRING_LITERAL> ...
>     <QUOTED_STRING> ...
>     <UNICODE_STRING_LITERAL> ...
>     "TRUE" ...
>     "FALSE" ...
>     "UNKNOWN" ...
>     "NULL" ...
>     <LBRACE_D> ...
>     <LBRACE_T> ...
>     <LBRACE_TS> ...
>     "DATE" ...
>     "TIME" <QUOTED_STRING> ...
>     "TIMESTAMP" ...
>     "INTERVAL" ...
>     "?" ...
>     "CAST" ...
>     "EXTRACT" ...
>     "POSITION" ...
>     "CONVERT" ...
>     "TRANSLATE" ...
>     "OVERLAY" ...
>     "FLOOR" ...
>     "CEIL" ...
>     "CEILING" ...
>     "SUBSTRING" ...
>     "TRIM" ...
>     "CLASSIFIER" ...
>     "MATCH_NUMBER" ...
>     "RUNNING" ...
>     "PREV" ...
>     "NEXT" ...
>     "JSON_EXISTS" ...
>     "JSON_VALUE" ...
>     "JSON_QUERY" ...
>     "JSON_OBJECT" ...
>     "JSON_OBJECTAGG" ...
>     "JSON_ARRAY" ...
>     "JSON_ARRAYAGG" ...
>     <LBRACE_FN> ...
>     "MULTISET" ...
>     "ARRAY" ...
>     "MAP" ...
>     "PERIOD" ...
>     "SPECIFIC" ...
>     <IDENTIFIER> ...
>     <QUOTED_IDENTIFIER> ...
>     <BACK_QUOTED_IDENTIFIER> ...
>     <BRACKET_QUOTED_IDENTIFIER> ...
>     <UNICODE_QUOTED_IDENTIFIER> ...
>     "ABS" ...
>     "AVG" ...
>     "CARDINALITY" ...
>     "CHAR_LENGTH" ...
>     "CHARACTER_LENGTH" ...
>     "COALESCE" ...
>     "COLLECT" ...
>     "COVAR_POP" ...
>     "COVAR_SAMP" ...
>     "CUME_DIST" ...
>     "COUNT" ...
>     "CURRENT_DATE" ...
>     "CURRENT_TIME" ...
>     "CURRENT_TIMESTAMP" ...
>     "DENSE_RANK" ...
>     "ELEMENT" ...
>     "EXP" ...
>     "FIRST_VALUE" ...
>     "FUSION" ...
>     "GROUPING" ...
>     "HOUR" ...
>     "LAG" ...
>     "LEAD" ...
>     "LEFT" ...
>     "LAST_VALUE" ...
>     "LN" ...
>     "LOCALTIME" ...
>     "LOCALTIMESTAMP" ...
>     "LOWER" ...
>     "MAX" ...
>     "MIN" ...
>     "MINUTE" ...
>     "MOD" ...
>     "MONTH" ...
>     "NTH_VALUE" ...
>     "NTILE" ...
>     "NULLIF" ...
>     "OCTET_LENGTH" ...
>     "PERCENT_RANK" ...
>     "POWER" ...
>     "RANK" ...
>     "REGR_COUNT" ...
>     "REGR_SXX" ...
>     "REGR_SYY" ...
>     "RIGHT" ...
>     "ROW_NUMBER" ...
>     "SECOND" ...
>     "SQRT" ...
>     "STDDEV_POP" ...
>     "STDDEV_SAMP" ...
>     "SUM" ...
>     "UPPER" ...
>     "TRUNCATE" ...
>     "USER" ...
>     "VAR_POP" ...
>     "VAR_SAMP" ...
>     "YEAR" ...
>     "CURRENT_CATALOG" ...
>     "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>     "CURRENT_PATH" ...
>     "CURRENT_ROLE" ...
>     "CURRENT_SCHEMA" ...
>     "CURRENT_USER" ...
>     "SESSION_USER" ...
>     "SYSTEM_USER" ...
>     "NEW" ...
>     "CASE" ...
>     "CURRENT" ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
>
> query:
>
>
> streamTableEnv.sqlUpdate(
> """
>     |
>     |CREATE TABLE user_behavior (
>     |    uid VARCHAR,
>     |    phoneType VARCHAR,
>     |    clickCount INT,
>     |    proctime AS PROCTIME(),
>     |    `time` TIMESTAMP(3)
>     |) WITH (
>     |    'connector.type' = 'kafka',
>     |    'connector.version' = 'universal',
>     |    'connector.topic' = 'user_behavior',
>     |    'connector.startup-mode' = 'earliest-offset',
>     |    'connector.properties.0.key' = 'zookeeper.connect',
>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>     |    'connector.properties.1.key' = 'bootstrap.servers',
>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>     |    'update-mode' = 'append',
>     |    'format.type' = 'json',
>     |    'format.derive-schema' = 'true'
>     |)
>     |""".stripMargin)
> streamTableEnv.sqlUpdate(
> """
>     |
>     |insert into  user_cnt
>     |SELECT
>     |  cast(b.`time` as string), u.age
>     |FROM
>     |  user_behavior AS b
>     |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>     |  ON b.uid = u.uid
>     |
>     |""".stripMargin)
>
>
>
>
>
>
> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
> 放在select 后面也不行。
>
>
>
>
>
>
>
>
> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
> >你写反了,是proctime AS PROCTIME()。
> >计算列跟普通query里面的AS是反着的。
> >
> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
> >
> >> flink 1.10.0:
> >> 在create table中,加PROCTIME() AS proctime字段报错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
> >> >Hi,
> >> >
> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >> >可以参考下[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >> >
> >> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
> >> >
> >> >> SLF4J: Class path contains multiple SLF4J bindings.
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> >> explanation.
> >> >>
> >> >> SLF4J: Actual binding is of type
> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >> >>
> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> >> configuration: logging only errors to the console.
> >> >>
> >> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> >> Cannot generate a valid execution plan for the given query:
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> >> fields=[time, sum_age])
> >> >>
> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >> >>
> >> >>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >> >>
> >> >>       :- FlinkLogicalCalc(select=[uid, time])
> >> >>
> >> >>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> >> phoneType,
> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >> >>
> >> >>       +- FlinkLogicalSnapshot(period=[$cor0.time])
> >> >>
> >> >>          +- FlinkLogicalCalc(select=[uid, age])
> >> >>
> >> >>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
> sex,
> >> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
> left
> >> >> table's proctime field, doesn't support 'PROCTIME()'
> >> >>
> >> >> Please check the documentation for the set of currently supported SQL
> >> >> features.
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >>
> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >>
> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >>
> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >> >>
> >> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >> >>
> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> >> join
> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> >> >> field, doesn't support 'PROCTIME()'
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> >> >>
> >> >> at
> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> >> >>
> >> >> ... 20 more
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> query:
> >> >>
> >> >>
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >> val blinkEnvSettings =
> >> >>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> >> val streamTableEnv =
> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
> >> >>
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE user_behavior (
> >> >>     |    uid VARCHAR,
> >> >>     |    phoneType VARCHAR,
> >> >>     |    clickCount INT,
> >> >>     |    `time` TIMESTAMP(3)
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'kafka',
> >> >>     |    'connector.version' = 'universal',
> >> >>     |    'connector.topic' = 'user_behavior',
> >> >>     |    'connector.startup-mode' = 'earliest-offset',
> >> >>     |    'connector.properties.0.key' = 'zookeeper.connect',
> >> >>     |    'connector.properties.0.value' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >>     |    'connector.properties.1.key' = 'bootstrap.servers',
> >> >>     |    'connector.properties.1.value' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >>     |    'update-mode' = 'append',
> >> >>     |    'format.type' = 'json',
> >> >>     |    'format.derive-schema' = 'true'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE user_cnt (
> >> >>     |    `time` VARCHAR,
> >> >>     |    sum_age INT
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'jdbc',
> >> >>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> >> >>     |    'connector.table' = 'user_cnt',
> >> >>     |    'connector.username' = 'root',
> >> >>     |    'connector.password' = '123456',
> >> >>     |    'connector.write.flush.max-rows' = '1'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >> val userTableSource = new MysqlAsyncLookupTableSource(
> >> >> Array("uid", "sex", "age", "created_time"),
> >> >> Array(),
> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> >> >> streamTableEnv.registerTableSource("users", userTableSource)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |insert into  user_cnt
> >> >>     |SELECT
> >> >>     |  cast(b.`time` as string), u.age
> >> >>     |FROM
> >> >>     |  user_behavior AS b
> >> >>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> >> >>     |  ON b.uid = u.uid
> >> >>     |
> >> >>     |""".stripMargin)
> >> >> streamTableEnv.execute("Temporal table join")
> >>
>

Re:Re: Re: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.
还是不行,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time FROM" at line 1, column 44.
Was expecting one of:
    "CURSOR" ...
    "EXISTS" ...
    "NOT" ...
    "ROW" ...
    "(" ...
    "+" ...
    "-" ...
    <UNSIGNED_INTEGER_LITERAL> ...
    <DECIMAL_NUMERIC_LITERAL> ...
    <APPROX_NUMERIC_LITERAL> ...
    <BINARY_STRING_LITERAL> ...
    <PREFIXED_STRING_LITERAL> ...
    <QUOTED_STRING> ...
    <UNICODE_STRING_LITERAL> ...
    "TRUE" ...
    "FALSE" ...
    "UNKNOWN" ...
    "NULL" ...
    <LBRACE_D> ...
    <LBRACE_T> ...
    <LBRACE_TS> ...
    "DATE" ...
    "TIME" <QUOTED_STRING> ...
    "TIMESTAMP" ...
    "INTERVAL" ...
    "?" ...
    "CAST" ...
    "EXTRACT" ...
    "POSITION" ...
    "CONVERT" ...
    "TRANSLATE" ...
    "OVERLAY" ...
    "FLOOR" ...
    "CEIL" ...
    "CEILING" ...
    "SUBSTRING" ...
    "TRIM" ...
    "CLASSIFIER" ...
    "MATCH_NUMBER" ...
    "RUNNING" ...
    "PREV" ...
    "NEXT" ...
    "JSON_EXISTS" ...
    "JSON_VALUE" ...
    "JSON_QUERY" ...
    "JSON_OBJECT" ...
    "JSON_OBJECTAGG" ...
    "JSON_ARRAY" ...
    "JSON_ARRAYAGG" ...
    <LBRACE_FN> ...
    "MULTISET" ...
    "ARRAY" ...
    "MAP" ...
    "PERIOD" ...
    "SPECIFIC" ...
    <IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    "ABS" ...
    "AVG" ...
    "CARDINALITY" ...
    "CHAR_LENGTH" ...
    "CHARACTER_LENGTH" ...
    "COALESCE" ...
    "COLLECT" ...
    "COVAR_POP" ...
    "COVAR_SAMP" ...
    "CUME_DIST" ...
    "COUNT" ...
    "CURRENT_DATE" ...
    "CURRENT_TIME" ...
    "CURRENT_TIMESTAMP" ...
    "DENSE_RANK" ...
    "ELEMENT" ...
    "EXP" ...
    "FIRST_VALUE" ...
    "FUSION" ...
    "GROUPING" ...
    "HOUR" ...
    "LAG" ...
    "LEAD" ...
    "LEFT" ...
    "LAST_VALUE" ...
    "LN" ...
    "LOCALTIME" ...
    "LOCALTIMESTAMP" ...
    "LOWER" ...
    "MAX" ...
    "MIN" ...
    "MINUTE" ...
    "MOD" ...
    "MONTH" ...
    "NTH_VALUE" ...
    "NTILE" ...
    "NULLIF" ...
    "OCTET_LENGTH" ...
    "PERCENT_RANK" ...
    "POWER" ...
    "RANK" ...
    "REGR_COUNT" ...
    "REGR_SXX" ...
    "REGR_SYY" ...
    "RIGHT" ...
    "ROW_NUMBER" ...
    "SECOND" ...
    "SQRT" ...
    "STDDEV_POP" ...
    "STDDEV_SAMP" ...
    "SUM" ...
    "UPPER" ...
    "TRUNCATE" ...
    "USER" ...
    "VAR_POP" ...
    "VAR_SAMP" ...
    "YEAR" ...
    "CURRENT_CATALOG" ...
    "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
    "CURRENT_PATH" ...
    "CURRENT_ROLE" ...
    "CURRENT_SCHEMA" ...
    "CURRENT_USER" ...
    "SESSION_USER" ...
    "SYSTEM_USER" ...
    "NEW" ...
    "CASE" ...
    "CURRENT" ...
    
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)


query:


streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_behavior (
    |    uid VARCHAR,
    |    phoneType VARCHAR,
    |    clickCount INT,
    |    proctime AS PROCTIME(),
    |    `time` TIMESTAMP(3)
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |    'connector.topic' = 'user_behavior',
    |    'connector.startup-mode' = 'earliest-offset',
    |    'connector.properties.0.key' = 'zookeeper.connect',
    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.1.key' = 'bootstrap.servers',
    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'update-mode' = 'append',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)
streamTableEnv.sqlUpdate(
"""
    |
    |insert into  user_cnt
    |SELECT
    |  cast(b.`time` as string), u.age
    |FROM
    |  user_behavior AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
    |  ON b.uid = u.uid
    |
    |""".stripMargin)






不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() 放在select 后面也不行。








在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>你写反了,是proctime AS PROCTIME()。
>计算列跟普通query里面的AS是反着的。
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>
>> flink 1.10.0:
>> 在create table中,加PROCTIME() AS proctime字段报错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> >Hi,
>> >
>> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> >可以参考下[1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> >
>> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>> >
>> >> SLF4J: Class path contains multiple SLF4J bindings.
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> >> explanation.
>> >>
>> >> SLF4J: Actual binding is of type
>> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> >>
>> >> ERROR StatusLogger No log4j2 configuration file found. Using default
>> >> configuration: logging only errors to the console.
>> >>
>> >> Exception in thread "main" org.apache.flink.table.api.TableException:
>> >> Cannot generate a valid execution plan for the given query:
>> >>
>> >>
>> >>
>> >>
>> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> >> fields=[time, sum_age])
>> >>
>> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>> >>
>> >>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>> >>
>> >>       :- FlinkLogicalCalc(select=[uid, time])
>> >>
>> >>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, user_behavior, source: [KafkaTableSource(uid,
>> phoneType,
>> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>> >>
>> >>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>> >>
>> >>          +- FlinkLogicalCalc(select=[uid, age])
>> >>
>> >>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> >> age, created_time)]]], fields=[uid, sex, age, created_time])
>> >>
>> >>
>> >>
>> >>
>> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> >> table's proctime field, doesn't support 'PROCTIME()'
>> >>
>> >> Please check the documentation for the set of currently supported SQL
>> >> features.
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>
>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>
>> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>
>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>> >>
>> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
>> join
>> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> >> field, doesn't support 'PROCTIME()'
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>> >>
>> >> at
>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>> >>
>> >> ... 20 more
>> >>
>> >>
>> >>
>> >>
>> >> query:
>> >>
>> >>
>> >> val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >> val blinkEnvSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >> val streamTableEnv =
>> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>> >>
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >>     |
>> >>     |CREATE TABLE user_behavior (
>> >>     |    uid VARCHAR,
>> >>     |    phoneType VARCHAR,
>> >>     |    clickCount INT,
>> >>     |    `time` TIMESTAMP(3)
>> >>     |) WITH (
>> >>     |    'connector.type' = 'kafka',
>> >>     |    'connector.version' = 'universal',
>> >>     |    'connector.topic' = 'user_behavior',
>> >>     |    'connector.startup-mode' = 'earliest-offset',
>> >>     |    'connector.properties.0.key' = 'zookeeper.connect',
>> >>     |    'connector.properties.0.value' =
>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>     |    'connector.properties.1.key' = 'bootstrap.servers',
>> >>     |    'connector.properties.1.value' =
>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>     |    'update-mode' = 'append',
>> >>     |    'format.type' = 'json',
>> >>     |    'format.derive-schema' = 'true'
>> >>     |)
>> >>     |""".stripMargin)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >>     |
>> >>     |CREATE TABLE user_cnt (
>> >>     |    `time` VARCHAR,
>> >>     |    sum_age INT
>> >>     |) WITH (
>> >>     |    'connector.type' = 'jdbc',
>> >>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> >>     |    'connector.table' = 'user_cnt',
>> >>     |    'connector.username' = 'root',
>> >>     |    'connector.password' = '123456',
>> >>     |    'connector.write.flush.max-rows' = '1'
>> >>     |)
>> >>     |""".stripMargin)
>> >> val userTableSource = new MysqlAsyncLookupTableSource(
>> >> Array("uid", "sex", "age", "created_time"),
>> >> Array(),
>> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> >> streamTableEnv.registerTableSource("users", userTableSource)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >>     |
>> >>     |insert into  user_cnt
>> >>     |SELECT
>> >>     |  cast(b.`time` as string), u.age
>> >>     |FROM
>> >>     |  user_behavior AS b
>> >>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> >>     |  ON b.uid = u.uid
>> >>     |
>> >>     |""".stripMargin)
>> >> streamTableEnv.execute("Temporal table join")
>>

Re: Re: flink sql Temporal table join failed

Posted by Benchao Li <li...@apache.org>.
你写反了,是proctime AS PROCTIME()。
计算列跟普通query里面的AS是反着的。

Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:

> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
> >Hi,
> >
> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >可以参考下[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >
> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
> >
> >> SLF4J: Class path contains multiple SLF4J bindings.
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >>
> >> SLF4J: Actual binding is of type
> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >>
> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> configuration: logging only errors to the console.
> >>
> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> Cannot generate a valid execution plan for the given query:
> >>
> >>
> >>
> >>
> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> fields=[time, sum_age])
> >>
> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >>
> >>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >>
> >>       :- FlinkLogicalCalc(select=[uid, time])
> >>
> >>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> phoneType,
> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >>
> >>       +- FlinkLogicalSnapshot(period=[$cor0.time])
> >>
> >>          +- FlinkLogicalCalc(select=[uid, age])
> >>
> >>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >>
> >>
> >>
> >>
> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> >> table's proctime field, doesn't support 'PROCTIME()'
> >>
> >> Please check the documentation for the set of currently supported SQL
> >> features.
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>
> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>
> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>
> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >>
> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >>
> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> join
> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> >> field, doesn't support 'PROCTIME()'
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >>
> >> at
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >>
> >> at
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> >>
> >> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> >>
> >> ... 20 more
> >>
> >>
> >>
> >>
> >> query:
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> val streamTableEnv =
> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
> >>
> >> streamTableEnv.sqlUpdate(
> >> """
> >>     |
> >>     |CREATE TABLE user_behavior (
> >>     |    uid VARCHAR,
> >>     |    phoneType VARCHAR,
> >>     |    clickCount INT,
> >>     |    `time` TIMESTAMP(3)
> >>     |) WITH (
> >>     |    'connector.type' = 'kafka',
> >>     |    'connector.version' = 'universal',
> >>     |    'connector.topic' = 'user_behavior',
> >>     |    'connector.startup-mode' = 'earliest-offset',
> >>     |    'connector.properties.0.key' = 'zookeeper.connect',
> >>     |    'connector.properties.0.value' =
> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >>     |    'connector.properties.1.key' = 'bootstrap.servers',
> >>     |    'connector.properties.1.value' =
> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >>     |    'update-mode' = 'append',
> >>     |    'format.type' = 'json',
> >>     |    'format.derive-schema' = 'true'
> >>     |)
> >>     |""".stripMargin)
> >> streamTableEnv.sqlUpdate(
> >> """
> >>     |
> >>     |CREATE TABLE user_cnt (
> >>     |    `time` VARCHAR,
> >>     |    sum_age INT
> >>     |) WITH (
> >>     |    'connector.type' = 'jdbc',
> >>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> >>     |    'connector.table' = 'user_cnt',
> >>     |    'connector.username' = 'root',
> >>     |    'connector.password' = '123456',
> >>     |    'connector.write.flush.max-rows' = '1'
> >>     |)
> >>     |""".stripMargin)
> >> val userTableSource = new MysqlAsyncLookupTableSource(
> >> Array("uid", "sex", "age", "created_time"),
> >> Array(),
> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> >> streamTableEnv.registerTableSource("users", userTableSource)
> >> streamTableEnv.sqlUpdate(
> >> """
> >>     |
> >>     |insert into  user_cnt
> >>     |SELECT
> >>     |  cast(b.`time` as string), u.age
> >>     |FROM
> >>     |  user_behavior AS b
> >>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> >>     |  ON b.uid = u.uid
> >>     |
> >>     |""".stripMargin)
> >> streamTableEnv.execute("Temporal table join")
>

Re:Re:Re: flink sql Temporal table join failed

Posted by 陈邵瑾 <15...@163.com>.
参考一下sql相关time的文档,根据描述使用姿势有问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
At 2020-06-12 14:24:07, "Zhou Zach" <wa...@163.com> wrote:
>flink 1.10.0:
>在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>Hi,
>>
>>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>可以参考下[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>
>>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>>
>>>
>>>
>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>>
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>
>>>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>
>>>       :- FlinkLogicalCalc(select=[uid, time])
>>>
>>>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>
>>>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>
>>>          +- FlinkLogicalCalc(select=[uid, age])
>>>
>>>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>
>>>
>>>
>>>
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>>
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>
>>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>
>>> ... 20 more
>>>
>>>
>>>
>>>
>>> query:
>>>
>>>
>>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val blinkEnvSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val streamTableEnv =
>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>
>>> streamTableEnv.sqlUpdate(
>>> """
>>>     |
>>>     |CREATE TABLE user_behavior (
>>>     |    uid VARCHAR,
>>>     |    phoneType VARCHAR,
>>>     |    clickCount INT,
>>>     |    `time` TIMESTAMP(3)
>>>     |) WITH (
>>>     |    'connector.type' = 'kafka',
>>>     |    'connector.version' = 'universal',
>>>     |    'connector.topic' = 'user_behavior',
>>>     |    'connector.startup-mode' = 'earliest-offset',
>>>     |    'connector.properties.0.key' = 'zookeeper.connect',
>>>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>     |    'connector.properties.1.key' = 'bootstrap.servers',
>>>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>     |    'update-mode' = 'append',
>>>     |    'format.type' = 'json',
>>>     |    'format.derive-schema' = 'true'
>>>     |)
>>>     |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>>     |
>>>     |CREATE TABLE user_cnt (
>>>     |    `time` VARCHAR,
>>>     |    sum_age INT
>>>     |) WITH (
>>>     |    'connector.type' = 'jdbc',
>>>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>     |    'connector.table' = 'user_cnt',
>>>     |    'connector.username' = 'root',
>>>     |    'connector.password' = '123456',
>>>     |    'connector.write.flush.max-rows' = '1'
>>>     |)
>>>     |""".stripMargin)
>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>> Array("uid", "sex", "age", "created_time"),
>>> Array(),
>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>> streamTableEnv.registerTableSource("users", userTableSource)
>>> streamTableEnv.sqlUpdate(
>>> """
>>>     |
>>>     |insert into  user_cnt
>>>     |SELECT
>>>     |  cast(b.`time` as string), u.age
>>>     |FROM
>>>     |  user_behavior AS b
>>>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>     |  ON b.uid = u.uid
>>>     |
>>>     |""".stripMargin)
>>> streamTableEnv.execute("Temporal table join")

Re:Re: flink sql Temporal table join failed

Posted by Zhou Zach <wa...@163.com>.
flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错

















在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>Hi,
>
>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>可以参考下[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Cannot generate a valid execution plan for the given query:
>>
>>
>>
>>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> fields=[time, sum_age])
>>
>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>
>>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>
>>       :- FlinkLogicalCalc(select=[uid, time])
>>
>>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>
>>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>>
>>          +- FlinkLogicalCalc(select=[uid, age])
>>
>>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>
>>
>>
>>
>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> table's proctime field, doesn't support 'PROCTIME()'
>>
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>
>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> field, doesn't support 'PROCTIME()'
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>
>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>
>> ... 20 more
>>
>>
>>
>>
>> query:
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |CREATE TABLE user_behavior (
>>     |    uid VARCHAR,
>>     |    phoneType VARCHAR,
>>     |    clickCount INT,
>>     |    `time` TIMESTAMP(3)
>>     |) WITH (
>>     |    'connector.type' = 'kafka',
>>     |    'connector.version' = 'universal',
>>     |    'connector.topic' = 'user_behavior',
>>     |    'connector.startup-mode' = 'earliest-offset',
>>     |    'connector.properties.0.key' = 'zookeeper.connect',
>>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>     |    'connector.properties.1.key' = 'bootstrap.servers',
>>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>     |    'update-mode' = 'append',
>>     |    'format.type' = 'json',
>>     |    'format.derive-schema' = 'true'
>>     |)
>>     |""".stripMargin)
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |CREATE TABLE user_cnt (
>>     |    `time` VARCHAR,
>>     |    sum_age INT
>>     |) WITH (
>>     |    'connector.type' = 'jdbc',
>>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>     |    'connector.table' = 'user_cnt',
>>     |    'connector.username' = 'root',
>>     |    'connector.password' = '123456',
>>     |    'connector.write.flush.max-rows' = '1'
>>     |)
>>     |""".stripMargin)
>> val userTableSource = new MysqlAsyncLookupTableSource(
>> Array("uid", "sex", "age", "created_time"),
>> Array(),
>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> streamTableEnv.registerTableSource("users", userTableSource)
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |insert into  user_cnt
>>     |SELECT
>>     |  cast(b.`time` as string), u.age
>>     |FROM
>>     |  user_behavior AS b
>>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>     |  ON b.uid = u.uid
>>     |
>>     |""".stripMargin)
>> streamTableEnv.execute("Temporal table join")

Re: flink sql Temporal table join failed

Posted by Benchao Li <li...@apache.org>.
Hi,

Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
可以参考下[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:

> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
>
>
>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> fields=[time, sum_age])
>
> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>
>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>
>       :- FlinkLogicalCalc(select=[uid, time])
>
>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>
>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>
>          +- FlinkLogicalCalc(select=[uid, age])
>
>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> age, created_time)]]], fields=[uid, sex, age, created_time])
>
>
>
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> table's proctime field, doesn't support 'PROCTIME()'
>
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> field, doesn't support 'PROCTIME()'
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>
> ... 20 more
>
>
>
>
> query:
>
>
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>
> streamTableEnv.sqlUpdate(
> """
>     |
>     |CREATE TABLE user_behavior (
>     |    uid VARCHAR,
>     |    phoneType VARCHAR,
>     |    clickCount INT,
>     |    `time` TIMESTAMP(3)
>     |) WITH (
>     |    'connector.type' = 'kafka',
>     |    'connector.version' = 'universal',
>     |    'connector.topic' = 'user_behavior',
>     |    'connector.startup-mode' = 'earliest-offset',
>     |    'connector.properties.0.key' = 'zookeeper.connect',
>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>     |    'connector.properties.1.key' = 'bootstrap.servers',
>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>     |    'update-mode' = 'append',
>     |    'format.type' = 'json',
>     |    'format.derive-schema' = 'true'
>     |)
>     |""".stripMargin)
> streamTableEnv.sqlUpdate(
> """
>     |
>     |CREATE TABLE user_cnt (
>     |    `time` VARCHAR,
>     |    sum_age INT
>     |) WITH (
>     |    'connector.type' = 'jdbc',
>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>     |    'connector.table' = 'user_cnt',
>     |    'connector.username' = 'root',
>     |    'connector.password' = '123456',
>     |    'connector.write.flush.max-rows' = '1'
>     |)
>     |""".stripMargin)
> val userTableSource = new MysqlAsyncLookupTableSource(
> Array("uid", "sex", "age", "created_time"),
> Array(),
> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> streamTableEnv.registerTableSource("users", userTableSource)
> streamTableEnv.sqlUpdate(
> """
>     |
>     |insert into  user_cnt
>     |SELECT
>     |  cast(b.`time` as string), u.age
>     |FROM
>     |  user_behavior AS b
>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>     |  ON b.uid = u.uid
>     |
>     |""".stripMargin)
> streamTableEnv.execute("Temporal table join")