You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/06/12 05:33:22 UTC
flink sql Temporal table join failed
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], fields=[time, sum_age])
+- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
:- FlinkLogicalCalc(select=[uid, time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
+- FlinkLogicalSnapshot(period=[$cor0.time])
+- FlinkLogicalCalc(select=[uid, age])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, created_time)]]], fields=[uid, sex, age, created_time])
Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 20 more
query:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_behavior (
| uid VARCHAR,
| phoneType VARCHAR,
| clickCount INT,
| `time` TIMESTAMP(3)
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| 'connector.topic' = 'user_behavior',
| 'connector.startup-mode' = 'earliest-offset',
| 'connector.properties.0.key' = 'zookeeper.connect',
| 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin)
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_cnt (
| `time` VARCHAR,
| sum_age INT
|) WITH (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
| 'connector.table' = 'user_cnt',
| 'connector.username' = 'root',
| 'connector.password' = '123456',
| 'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)
val userTableSource = new MysqlAsyncLookupTableSource(
Array("uid", "sex", "age", "created_time"),
Array(),
Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
streamTableEnv.registerTableSource("users", userTableSource)
streamTableEnv.sqlUpdate(
"""
|
|insert into user_cnt
|SELECT
| cast(b.`time` as string), u.age
|FROM
| user_behavior AS b
| JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
| ON b.uid = u.uid
|
|""".stripMargin)
streamTableEnv.execute("Temporal table join")
Re: flink sql Temporal table join failed
Posted by 李奇 <35...@qq.com>.
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
> 在 2020年6月12日,下午2:24,Zhou Zach <wa...@163.com> 写道:
>
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> Hi,
>>
>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> 可以参考下[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>
>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>>
>>>
>>>
>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>>
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>
>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>
>>> :- FlinkLogicalCalc(select=[uid, time])
>>>
>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>
>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>
>>> +- FlinkLogicalCalc(select=[uid, age])
>>>
>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>
>>>
>>>
>>>
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>>
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>
>>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>
>>> ... 20 more
>>>
>>>
>>>
>>>
>>> query:
>>>
>>>
>>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val blinkEnvSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val streamTableEnv =
>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_behavior (
>>> | uid VARCHAR,
>>> | phoneType VARCHAR,
>>> | clickCount INT,
>>> | `time` TIMESTAMP(3)
>>> |) WITH (
>>> | 'connector.type' = 'kafka',
>>> | 'connector.version' = 'universal',
>>> | 'connector.topic' = 'user_behavior',
>>> | 'connector.startup-mode' = 'earliest-offset',
>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>> | 'update-mode' = 'append',
>>> | 'format.type' = 'json',
>>> | 'format.derive-schema' = 'true'
>>> |)
>>> |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_cnt (
>>> | `time` VARCHAR,
>>> | sum_age INT
>>> |) WITH (
>>> | 'connector.type' = 'jdbc',
>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>> | 'connector.table' = 'user_cnt',
>>> | 'connector.username' = 'root',
>>> | 'connector.password' = '123456',
>>> | 'connector.write.flush.max-rows' = '1'
>>> |)
>>> |""".stripMargin)
>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>> Array("uid", "sex", "age", "created_time"),
>>> Array(),
>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>> streamTableEnv.registerTableSource("users", userTableSource)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |insert into user_cnt
>>> |SELECT
>>> | cast(b.`time` as string), u.age
>>> |FROM
>>> | user_behavior AS b
>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>> | ON b.uid = u.uid
>>> |
>>> |""".stripMargin)
>>> streamTableEnv.execute("Temporal table join")
Re:Re: flink sql Temporal table join failed
Posted by Zhou Zach <wa...@163.com>.
感谢提醒
在 2020-06-12 17:43:20,"Leonard Xu" <xb...@gmail.com> 写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach <wa...@163.com> 写道:
>>
>>
>>
>>
>> 是的,1.10.0版本
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>>
>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>>>
>>>> 还是不行,
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type
>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>> configuration: logging only errors to the console.
>>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>>> Was expecting one of:
>>>> "CURSOR" ...
>>>> "EXISTS" ...
>>>> "NOT" ...
>>>> "ROW" ...
>>>> "(" ...
>>>> "+" ...
>>>> "-" ...
>>>> <UNSIGNED_INTEGER_LITERAL> ...
>>>> <DECIMAL_NUMERIC_LITERAL> ...
>>>> <APPROX_NUMERIC_LITERAL> ...
>>>> <BINARY_STRING_LITERAL> ...
>>>> <PREFIXED_STRING_LITERAL> ...
>>>> <QUOTED_STRING> ...
>>>> <UNICODE_STRING_LITERAL> ...
>>>> "TRUE" ...
>>>> "FALSE" ...
>>>> "UNKNOWN" ...
>>>> "NULL" ...
>>>> <LBRACE_D> ...
>>>> <LBRACE_T> ...
>>>> <LBRACE_TS> ...
>>>> "DATE" ...
>>>> "TIME" <QUOTED_STRING> ...
>>>> "TIMESTAMP" ...
>>>> "INTERVAL" ...
>>>> "?" ...
>>>> "CAST" ...
>>>> "EXTRACT" ...
>>>> "POSITION" ...
>>>> "CONVERT" ...
>>>> "TRANSLATE" ...
>>>> "OVERLAY" ...
>>>> "FLOOR" ...
>>>> "CEIL" ...
>>>> "CEILING" ...
>>>> "SUBSTRING" ...
>>>> "TRIM" ...
>>>> "CLASSIFIER" ...
>>>> "MATCH_NUMBER" ...
>>>> "RUNNING" ...
>>>> "PREV" ...
>>>> "NEXT" ...
>>>> "JSON_EXISTS" ...
>>>> "JSON_VALUE" ...
>>>> "JSON_QUERY" ...
>>>> "JSON_OBJECT" ...
>>>> "JSON_OBJECTAGG" ...
>>>> "JSON_ARRAY" ...
>>>> "JSON_ARRAYAGG" ...
>>>> <LBRACE_FN> ...
>>>> "MULTISET" ...
>>>> "ARRAY" ...
>>>> "MAP" ...
>>>> "PERIOD" ...
>>>> "SPECIFIC" ...
>>>> <IDENTIFIER> ...
>>>> <QUOTED_IDENTIFIER> ...
>>>> <BACK_QUOTED_IDENTIFIER> ...
>>>> <BRACKET_QUOTED_IDENTIFIER> ...
>>>> <UNICODE_QUOTED_IDENTIFIER> ...
>>>> "ABS" ...
>>>> "AVG" ...
>>>> "CARDINALITY" ...
>>>> "CHAR_LENGTH" ...
>>>> "CHARACTER_LENGTH" ...
>>>> "COALESCE" ...
>>>> "COLLECT" ...
>>>> "COVAR_POP" ...
>>>> "COVAR_SAMP" ...
>>>> "CUME_DIST" ...
>>>> "COUNT" ...
>>>> "CURRENT_DATE" ...
>>>> "CURRENT_TIME" ...
>>>> "CURRENT_TIMESTAMP" ...
>>>> "DENSE_RANK" ...
>>>> "ELEMENT" ...
>>>> "EXP" ...
>>>> "FIRST_VALUE" ...
>>>> "FUSION" ...
>>>> "GROUPING" ...
>>>> "HOUR" ...
>>>> "LAG" ...
>>>> "LEAD" ...
>>>> "LEFT" ...
>>>> "LAST_VALUE" ...
>>>> "LN" ...
>>>> "LOCALTIME" ...
>>>> "LOCALTIMESTAMP" ...
>>>> "LOWER" ...
>>>> "MAX" ...
>>>> "MIN" ...
>>>> "MINUTE" ...
>>>> "MOD" ...
>>>> "MONTH" ...
>>>> "NTH_VALUE" ...
>>>> "NTILE" ...
>>>> "NULLIF" ...
>>>> "OCTET_LENGTH" ...
>>>> "PERCENT_RANK" ...
>>>> "POWER" ...
>>>> "RANK" ...
>>>> "REGR_COUNT" ...
>>>> "REGR_SXX" ...
>>>> "REGR_SYY" ...
>>>> "RIGHT" ...
>>>> "ROW_NUMBER" ...
>>>> "SECOND" ...
>>>> "SQRT" ...
>>>> "STDDEV_POP" ...
>>>> "STDDEV_SAMP" ...
>>>> "SUM" ...
>>>> "UPPER" ...
>>>> "TRUNCATE" ...
>>>> "USER" ...
>>>> "VAR_POP" ...
>>>> "VAR_SAMP" ...
>>>> "YEAR" ...
>>>> "CURRENT_CATALOG" ...
>>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>> "CURRENT_PATH" ...
>>>> "CURRENT_ROLE" ...
>>>> "CURRENT_SCHEMA" ...
>>>> "CURRENT_USER" ...
>>>> "SESSION_USER" ...
>>>> "SYSTEM_USER" ...
>>>> "NEW" ...
>>>> "CASE" ...
>>>> "CURRENT" ...
>>>>
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>>> at
>>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>
>>>>
>>>> query:
>>>>
>>>>
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>> |
>>>> |CREATE TABLE user_behavior (
>>>> | uid VARCHAR,
>>>> | phoneType VARCHAR,
>>>> | clickCount INT,
>>>> | proctime AS PROCTIME(),
>>>> | `time` TIMESTAMP(3)
>>>> |) WITH (
>>>> | 'connector.type' = 'kafka',
>>>> | 'connector.version' = 'universal',
>>>> | 'connector.topic' = 'user_behavior',
>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>> | 'update-mode' = 'append',
>>>> | 'format.type' = 'json',
>>>> | 'format.derive-schema' = 'true'
>>>> |)
>>>> |""".stripMargin)
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>> |
>>>> |insert into user_cnt
>>>> |SELECT
>>>> | cast(b.`time` as string), u.age
>>>> |FROM
>>>> | user_behavior AS b
>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>>> | ON b.uid = u.uid
>>>> |
>>>> |""".stripMargin)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>>> 放在select 后面也不行。
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>>>>> 你写反了,是proctime AS PROCTIME()。
>>>>> 计算列跟普通query里面的AS是反着的。
>>>>>
>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>>
>>>>>> flink 1.10.0:
>>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>>>>>> Hi,
>>>>>>>
>>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>>> 可以参考下[1]
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>>
>>>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>>
>>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>>
>>>>>>>> SLF4J: Found binding in
>>>>>>>>
>>>>>>
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>>
>>>>>>>> SLF4J: Found binding in
>>>>>>>>
>>>>>>
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>>
>>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>>> explanation.
>>>>>>>>
>>>>>>>> SLF4J: Actual binding is of type
>>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>>
>>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>>> configuration: logging only errors to the console.
>>>>>>>>
>>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>>> fields=[time, sum_age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>>
>>>>>>>> :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>>
>>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>>> phoneType,
>>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>>
>>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>>
>>>>>>>> +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>>> sex,
>>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>>> left
>>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>>
>>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>>> features.
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>
>>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>
>>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>
>>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at
>>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>>
>>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>>> join
>>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>>
>>>>>>>> at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>>
>>>>>>>> ... 20 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> query:
>>>>>>>>
>>>>>>>>
>>>>>>>> val streamExecutionEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkEnvSettings =
>>>>>>>>
>>>>>>
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> val streamTableEnv =
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>>
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |CREATE TABLE user_behavior (
>>>>>>>> | uid VARCHAR,
>>>>>>>> | phoneType VARCHAR,
>>>>>>>> | clickCount INT,
>>>>>>>> | `time` TIMESTAMP(3)
>>>>>>>> |) WITH (
>>>>>>>> | 'connector.type' = 'kafka',
>>>>>>>> | 'connector.version' = 'universal',
>>>>>>>> | 'connector.topic' = 'user_behavior',
>>>>>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>> | 'connector.properties.0.value' =
>>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>> | 'connector.properties.1.value' =
>>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>>> | 'update-mode' = 'append',
>>>>>>>> | 'format.type' = 'json',
>>>>>>>> | 'format.derive-schema' = 'true'
>>>>>>>> |)
>>>>>>>> |""".stripMargin)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |CREATE TABLE user_cnt (
>>>>>>>> | `time` VARCHAR,
>>>>>>>> | sum_age INT
>>>>>>>> |) WITH (
>>>>>>>> | 'connector.type' = 'jdbc',
>>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>>> | 'connector.table' = 'user_cnt',
>>>>>>>> | 'connector.username' = 'root',
>>>>>>>> | 'connector.password' = '123456',
>>>>>>>> | 'connector.write.flush.max-rows' = '1'
>>>>>>>> |)
>>>>>>>> |""".stripMargin)
>>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>>> Array(),
>>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |insert into user_cnt
>>>>>>>> |SELECT
>>>>>>>> | cast(b.`time` as string), u.age
>>>>>>>> |FROM
>>>>>>>> | user_behavior AS b
>>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>>> | ON b.uid = u.uid
>>>>>>>> |
>>>>>>>> |""".stripMargin)
>>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>>
>>>>
Re:回复: flink sql Temporal table join failed
Posted by Zhou Zach <wa...@163.com>.
好的
在 2020-06-12 17:46:22,"咖啡泡油条" <93...@qq.com> 写道:
>可以参考之前的邮件列表
>https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人: "Leonard Xu"<xbjtdcq@gmail.com>;
>发送时间: 2020年6月12日(星期五) 下午5:43
>收件人: "user-zh"<user-zh@flink.apache.org>;
>
>主题: Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com> 写道:
>>
>>
>>
>>
>> 是的,1.10.0版本
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org> 写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>>
>>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午3:47写道:
>>>
>>>> 还是不行,
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type
>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>> configuration: logging only errors to the console.
>>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>>> Was expecting one of:
>>>> "CURSOR" ...
>>>> "EXISTS" ...
>>>> "NOT" ...
>>>> "ROW" ...
>>>> "(" ...
>>>> "+" ...
>>>> "-" ...
>>>> <UNSIGNED_INTEGER_LITERAL> ...
>>>> <DECIMAL_NUMERIC_LITERAL> ...
>>>> <APPROX_NUMERIC_LITERAL> ...
>>>> <BINARY_STRING_LITERAL> ...
>>>> <PREFIXED_STRING_LITERAL> ...
>>>> <QUOTED_STRING> ...
>>>> <UNICODE_STRING_LITERAL> ...
>>>> "TRUE" ...
>>>> "FALSE" ...
>>>> "UNKNOWN" ...
>>>> "NULL" ...
>>>> <LBRACE_D> ...
>>>> <LBRACE_T> ...
>>>> <LBRACE_TS> ...
>>>> "DATE" ...
>>>> "TIME" <QUOTED_STRING> ...
>>>> "TIMESTAMP" ...
>>>> "INTERVAL" ...
>>>> "?" ...
>>>> "CAST" ...
>>>> "EXTRACT" ...
>>>> "POSITION" ...
>>>> "CONVERT" ...
>>>> "TRANSLATE" ...
>>>> "OVERLAY" ...
>>>> "FLOOR" ...
>>>> "CEIL" ...
>>>> "CEILING" ...
>>>> "SUBSTRING" ...
>>>> "TRIM" ...
>>>> "CLASSIFIER" ...
>>>> "MATCH_NUMBER" ...
>>>> "RUNNING" ...
>>>> "PREV" ...
>>>> "NEXT" ...
>>>> "JSON_EXISTS" ...
>>>> "JSON_VALUE" ...
>>>> "JSON_QUERY" ...
>>>> "JSON_OBJECT" ...
>>>> "JSON_OBJECTAGG" ...
>>>> "JSON_ARRAY" ...
>>>> "JSON_ARRAYAGG" ...
>>>> <LBRACE_FN> ...
>>>> "MULTISET" ...
>>>> "ARRAY" ...
>>>> "MAP" ...
>>>> "PERIOD" ...
>>>> "SPECIFIC" ...
>>>> <IDENTIFIER> ...
>>>> <QUOTED_IDENTIFIER> ...
>>>> <BACK_QUOTED_IDENTIFIER> ...
>>>> <BRACKET_QUOTED_IDENTIFIER> ...
>>>> <UNICODE_QUOTED_IDENTIFIER> ...
>>>> "ABS" ...
>>>> "AVG" ...
>>>> "CARDINALITY" ...
>>>> "CHAR_LENGTH" ...
>>>> "CHARACTER_LENGTH" ...
>>>> "COALESCE" ...
>>>> "COLLECT" ...
>>>> "COVAR_POP" ...
>>>> "COVAR_SAMP" ...
>>>> "CUME_DIST" ...
>>>> "COUNT" ...
>>>> "CURRENT_DATE" ...
>>>> "CURRENT_TIME" ...
>>>> "CURRENT_TIMESTAMP" ...
>>>> "DENSE_RANK" ...
>>>> "ELEMENT" ...
>>>> "EXP" ...
>>>> "FIRST_VALUE" ...
>>>> "FUSION" ...
>>>> "GROUPING" ...
>>>> "HOUR" ...
>>>> "LAG" ...
>>>> "LEAD" ...
>>>> "LEFT" ...
>>>> "LAST_VALUE" ...
>>>> "LN" ...
>>>> "LOCALTIME" ...
>>>> "LOCALTIMESTAMP" ...
>>>> "LOWER" ...
>>>> "MAX" ...
>>>> "MIN" ...
>>>> "MINUTE" ...
>>>> "MOD" ...
>>>> "MONTH" ...
>>>> "NTH_VALUE" ...
>>>> "NTILE" ...
>>>> "NULLIF" ...
>>>> "OCTET_LENGTH" ...
>>>> "PERCENT_RANK" ...
>>>> "POWER" ...
>>>> "RANK" ...
>>>> "REGR_COUNT" ...
>>>> "REGR_SXX" ...
>>>> "REGR_SYY" ...
>>>> "RIGHT" ...
>>>> "ROW_NUMBER" ...
>>>> "SECOND" ...
>>>> "SQRT" ...
>>>> "STDDEV_POP" ...
>>>> "STDDEV_SAMP" ...
>>>> "SUM" ...
>>>> "UPPER" ...
>>>> "TRUNCATE" ...
>>>> "USER" ...
>>>> "VAR_POP" ...
>>>> "VAR_SAMP" ...
>>>> "YEAR" ...
>>>> "CURRENT_CATALOG" ...
>>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>> "CURRENT_PATH" ...
>>>> "CURRENT_ROLE" ...
>>>> "CURRENT_SCHEMA" ...
>>>> "CURRENT_USER" ...
>>>> "SESSION_USER" ...
>>>> "SYSTEM_USER" ...
>>>> "NEW" ...
>>>> "CASE" ...
>>>> "CURRENT" ...
>>>>
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>>> at
>>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>>> at
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>
>>>>
>>>> query:
>>>>
>>>>
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>> |
>>>> |CREATE TABLE user_behavior (
>>>> | uid VARCHAR,
>>>> | phoneType VARCHAR,
>>>> | clickCount INT,
>>>> | proctime AS PROCTIME(),
>>>> | `time` TIMESTAMP(3)
>>>> |) WITH (
>>>> | 'connector.type' = 'kafka',
>>>> | 'connector.version' = 'universal',
>>>> | 'connector.topic' = 'user_behavior',
>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>> | 'update-mode' = 'append',
>>>> | 'format.type' = 'json',
>>>> | 'format.derive-schema' = 'true'
>>>> |)
>>>> |""".stripMargin)
>>>> streamTableEnv.sqlUpdate(
>>>> """
>>>> |
>>>> |insert into user_cnt
>>>> |SELECT
>>>> | cast(b.`time` as string), u.age
>>>> |FROM
>>>> | user_behavior AS b
>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>>> | ON b.uid = u.uid
>>>> |
>>>> |""".stripMargin)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>>> 放在select 后面也不行。
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org> 写道:
>>>>> 你写反了,是proctime AS PROCTIME()。
>>>>> 计算列跟普通query里面的AS是反着的。
>>>>>
>>>>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>>
>>>>>> flink 1.10.0:
>>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org> 写道:
>>>>>>> Hi,
>>>>>>>
>>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>>> 可以参考下[1]
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>>
>>>>>>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>>
>>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>>
>>>>>>>> SLF4J: Found binding in
>>>>>>>>
>>>>>>
>>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>>
>>>>>>>> SLF4J: Found binding in
>>>>>>>>
>>>>>>
>>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>>
>>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>>> explanation.
>>>>>>>>
>>>>>>>> SLF4J: Actual binding is of type
>>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>>
>>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>>> configuration: logging only errors to the console.
>>>>>>>>
>>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>>> fields=[time, sum_age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>>
>>>>>>>> :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>>
>>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>>> phoneType,
>>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>>
>>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>>
>>>>>>>> +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>>
>>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>>> sex,
>>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>>> left
>>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>>
>>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>>> features.
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>
>>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>
>>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>
>>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>>
>>>>>>>> at
>>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>>
>>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>>> join
>>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>>
>>>>>>>> at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>>
>>>>>>>> at
>>>>>>>>
>>>>>>
>>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>>
>>>>>>>> ... 20 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> query:
>>>>>>>>
>>>>>>>>
>>>>>>>> val streamExecutionEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val blinkEnvSettings =
>>>>>>>>
>>>>>>
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> val streamTableEnv =
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>>
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |CREATE TABLE user_behavior (
>>>>>>>> | uid VARCHAR,
>>>>>>>> | phoneType VARCHAR,
>>>>>>>> | clickCount INT,
>>>>>>>> | `time` TIMESTAMP(3)
>>>>>>>> |) WITH (
>>>>>>>> | 'connector.type' = 'kafka',
>>>>>>>> | 'connector.version' = 'universal',
>>>>>>>> | 'connector.topic' = 'user_behavior',
>>>>>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>> | 'connector.properties.0.value' =
>>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>> | 'connector.properties.1.value' =
>>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>>> | 'update-mode' = 'append',
>>>>>>>> | 'format.type' = 'json',
>>>>>>>> | 'format.derive-schema' = 'true'
>>>>>>>> |)
>>>>>>>> |""".stripMargin)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |CREATE TABLE user_cnt (
>>>>>>>> | `time` VARCHAR,
>>>>>>>> | sum_age INT
>>>>>>>> |) WITH (
>>>>>>>> | 'connector.type' = 'jdbc',
>>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>>> | 'connector.table' = 'user_cnt',
>>>>>>>> | 'connector.username' = 'root',
>>>>>>>> | 'connector.password' = '123456',
>>>>>>>> | 'connector.write.flush.max-rows' = '1'
>>>>>>>> |)
>>>>>>>> |""".stripMargin)
>>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>>> Array(),
>>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>>> """
>>>>>>>> |
>>>>>>>> |insert into user_cnt
>>>>>>>> |SELECT
>>>>>>>> | cast(b.`time` as string), u.age
>>>>>>>> |FROM
>>>>>>>> | user_behavior AS b
>>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>>> | ON b.uid = u.uid
>>>>>>>> |
>>>>>>>> |""".stripMargin)
>>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>>
>>>>
回复: flink sql Temporal table join failed
Posted by 咖啡泡油条 <93...@qq.com>.
可以参考之前的邮件列表
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
------------------ 原始邮件 ------------------
发件人: "Leonard Xu"<xbjtdcq@gmail.com>;
发送时间: 2020年6月12日(星期五) 下午5:43
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: flink sql Temporal table join failed
你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
祝好
Leonard Xu
> 在 2020年6月12日,17:38,Zhou Zach <wander669@163.com> 写道:
>
>
>
>
> 是的,1.10.0版本
>
>
>
>
>
>
>
>
> 在 2020-06-12 16:28:15,"Benchao Li" <libenchao@apache.org> 写道:
>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>
>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午3:47写道:
>>
>>> 还是不行,
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>> Was expecting one of:
>>> "CURSOR" ...
>>> "EXISTS" ...
>>> "NOT" ...
>>> "ROW" ...
>>> "(" ...
>>> "+" ...
>>> "-" ...
>>> <UNSIGNED_INTEGER_LITERAL> ...
>>> <DECIMAL_NUMERIC_LITERAL> ...
>>> <APPROX_NUMERIC_LITERAL> ...
>>> <BINARY_STRING_LITERAL> ...
>>> <PREFIXED_STRING_LITERAL> ...
>>> <QUOTED_STRING> ...
>>> <UNICODE_STRING_LITERAL> ...
>>> "TRUE" ...
>>> "FALSE" ...
>>> "UNKNOWN" ...
>>> "NULL" ...
>>> <LBRACE_D> ...
>>> <LBRACE_T> ...
>>> <LBRACE_TS> ...
>>> "DATE" ...
>>> "TIME" <QUOTED_STRING> ...
>>> "TIMESTAMP" ...
>>> "INTERVAL" ...
>>> "?" ...
>>> "CAST" ...
>>> "EXTRACT" ...
>>> "POSITION" ...
>>> "CONVERT" ...
>>> "TRANSLATE" ...
>>> "OVERLAY" ...
>>> "FLOOR" ...
>>> "CEIL" ...
>>> "CEILING" ...
>>> "SUBSTRING" ...
>>> "TRIM" ...
>>> "CLASSIFIER" ...
>>> "MATCH_NUMBER" ...
>>> "RUNNING" ...
>>> "PREV" ...
>>> "NEXT" ...
>>> "JSON_EXISTS" ...
>>> "JSON_VALUE" ...
>>> "JSON_QUERY" ...
>>> "JSON_OBJECT" ...
>>> "JSON_OBJECTAGG" ...
>>> "JSON_ARRAY" ...
>>> "JSON_ARRAYAGG" ...
>>> <LBRACE_FN> ...
>>> "MULTISET" ...
>>> "ARRAY" ...
>>> "MAP" ...
>>> "PERIOD" ...
>>> "SPECIFIC" ...
>>> <IDENTIFIER> ...
>>> <QUOTED_IDENTIFIER> ...
>>> <BACK_QUOTED_IDENTIFIER> ...
>>> <BRACKET_QUOTED_IDENTIFIER> ...
>>> <UNICODE_QUOTED_IDENTIFIER> ...
>>> "ABS" ...
>>> "AVG" ...
>>> "CARDINALITY" ...
>>> "CHAR_LENGTH" ...
>>> "CHARACTER_LENGTH" ...
>>> "COALESCE" ...
>>> "COLLECT" ...
>>> "COVAR_POP" ...
>>> "COVAR_SAMP" ...
>>> "CUME_DIST" ...
>>> "COUNT" ...
>>> "CURRENT_DATE" ...
>>> "CURRENT_TIME" ...
>>> "CURRENT_TIMESTAMP" ...
>>> "DENSE_RANK" ...
>>> "ELEMENT" ...
>>> "EXP" ...
>>> "FIRST_VALUE" ...
>>> "FUSION" ...
>>> "GROUPING" ...
>>> "HOUR" ...
>>> "LAG" ...
>>> "LEAD" ...
>>> "LEFT" ...
>>> "LAST_VALUE" ...
>>> "LN" ...
>>> "LOCALTIME" ...
>>> "LOCALTIMESTAMP" ...
>>> "LOWER" ...
>>> "MAX" ...
>>> "MIN" ...
>>> "MINUTE" ...
>>> "MOD" ...
>>> "MONTH" ...
>>> "NTH_VALUE" ...
>>> "NTILE" ...
>>> "NULLIF" ...
>>> "OCTET_LENGTH" ...
>>> "PERCENT_RANK" ...
>>> "POWER" ...
>>> "RANK" ...
>>> "REGR_COUNT" ...
>>> "REGR_SXX" ...
>>> "REGR_SYY" ...
>>> "RIGHT" ...
>>> "ROW_NUMBER" ...
>>> "SECOND" ...
>>> "SQRT" ...
>>> "STDDEV_POP" ...
>>> "STDDEV_SAMP" ...
>>> "SUM" ...
>>> "UPPER" ...
>>> "TRUNCATE" ...
>>> "USER" ...
>>> "VAR_POP" ...
>>> "VAR_SAMP" ...
>>> "YEAR" ...
>>> "CURRENT_CATALOG" ...
>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>> "CURRENT_PATH" ...
>>> "CURRENT_ROLE" ...
>>> "CURRENT_SCHEMA" ...
>>> "CURRENT_USER" ...
>>> "SESSION_USER" ...
>>> "SYSTEM_USER" ...
>>> "NEW" ...
>>> "CASE" ...
>>> "CURRENT" ...
>>>
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>> at
>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>>
>>> query:
>>>
>>>
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_behavior (
>>> | uid VARCHAR,
>>> | phoneType VARCHAR,
>>> | clickCount INT,
>>> | proctime AS PROCTIME(),
>>> | `time` TIMESTAMP(3)
>>> |) WITH (
>>> | 'connector.type' = 'kafka',
>>> | 'connector.version' = 'universal',
>>> | 'connector.topic' = 'user_behavior',
>>> | 'connector.startup-mode' = 'earliest-offset',
>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>> | 'update-mode' = 'append',
>>> | 'format.type' = 'json',
>>> | 'format.derive-schema' = 'true'
>>> |)
>>> |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |insert into user_cnt
>>> |SELECT
>>> | cast(b.`time` as string), u.age
>>> |FROM
>>> | user_behavior AS b
>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>> | ON b.uid = u.uid
>>> |
>>> |""".stripMargin)
>>>
>>>
>>>
>>>
>>>
>>>
>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>> 放在select 后面也不行。
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-06-12 15:29:49,"Benchao Li" <libenchao@apache.org> 写道:
>>>> 你写反了,是proctime AS PROCTIME()。
>>>> 计算列跟普通query里面的AS是反着的。
>>>>
>>>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>
>>>>> flink 1.10.0:
>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <libenchao@apache.org> 写道:
>>>>>> Hi,
>>>>>>
>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>> 可以参考下[1]
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>
>>>>>> Zhou Zach <wander669@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>
>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>
>>>>>>> SLF4J: Found binding in
>>>>>>>
>>>>>
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>
>>>>>>> SLF4J: Found binding in
>>>>>>>
>>>>>
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>
>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>> explanation.
>>>>>>>
>>>>>>> SLF4J: Actual binding is of type
>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>
>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>> configuration: logging only errors to the console.
>>>>>>>
>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>> fields=[time, sum_age])
>>>>>>>
>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>
>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>
>>>>>>> :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>
>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>> phoneType,
>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>
>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>
>>>>>>> +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>
>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>> sex,
>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>> left
>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>
>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>> features.
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>
>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at
>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>
>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>> join
>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>
>>>>>>> at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>
>>>>>>> ... 20 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> query:
>>>>>>>
>>>>>>>
>>>>>>> val streamExecutionEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val blinkEnvSettings =
>>>>>>>
>>>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>> val streamTableEnv =
>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |CREATE TABLE user_behavior (
>>>>>>> | uid VARCHAR,
>>>>>>> | phoneType VARCHAR,
>>>>>>> | clickCount INT,
>>>>>>> | `time` TIMESTAMP(3)
>>>>>>> |) WITH (
>>>>>>> | 'connector.type' = 'kafka',
>>>>>>> | 'connector.version' = 'universal',
>>>>>>> | 'connector.topic' = 'user_behavior',
>>>>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>> | 'connector.properties.0.value' =
>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>> | 'connector.properties.1.value' =
>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>> | 'update-mode' = 'append',
>>>>>>> | 'format.type' = 'json',
>>>>>>> | 'format.derive-schema' = 'true'
>>>>>>> |)
>>>>>>> |""".stripMargin)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |CREATE TABLE user_cnt (
>>>>>>> | `time` VARCHAR,
>>>>>>> | sum_age INT
>>>>>>> |) WITH (
>>>>>>> | 'connector.type' = 'jdbc',
>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>> | 'connector.table' = 'user_cnt',
>>>>>>> | 'connector.username' = 'root',
>>>>>>> | 'connector.password' = '123456',
>>>>>>> | 'connector.write.flush.max-rows' = '1'
>>>>>>> |)
>>>>>>> |""".stripMargin)
>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>> Array(),
>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |insert into user_cnt
>>>>>>> |SELECT
>>>>>>> | cast(b.`time` as string), u.age
>>>>>>> |FROM
>>>>>>> | user_behavior AS b
>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>> | ON b.uid = u.uid
>>>>>>> |
>>>>>>> |""".stripMargin)
>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>
>>>
Re: flink sql Temporal table join failed
Posted by Leonard Xu <xb...@gmail.com>.
你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
祝好
Leonard Xu
> 在 2020年6月12日,17:38,Zhou Zach <wa...@163.com> 写道:
>
>
>
>
> 是的,1.10.0版本
>
>
>
>
>
>
>
>
> 在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>
>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>>
>>> 还是不行,
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>> Was expecting one of:
>>> "CURSOR" ...
>>> "EXISTS" ...
>>> "NOT" ...
>>> "ROW" ...
>>> "(" ...
>>> "+" ...
>>> "-" ...
>>> <UNSIGNED_INTEGER_LITERAL> ...
>>> <DECIMAL_NUMERIC_LITERAL> ...
>>> <APPROX_NUMERIC_LITERAL> ...
>>> <BINARY_STRING_LITERAL> ...
>>> <PREFIXED_STRING_LITERAL> ...
>>> <QUOTED_STRING> ...
>>> <UNICODE_STRING_LITERAL> ...
>>> "TRUE" ...
>>> "FALSE" ...
>>> "UNKNOWN" ...
>>> "NULL" ...
>>> <LBRACE_D> ...
>>> <LBRACE_T> ...
>>> <LBRACE_TS> ...
>>> "DATE" ...
>>> "TIME" <QUOTED_STRING> ...
>>> "TIMESTAMP" ...
>>> "INTERVAL" ...
>>> "?" ...
>>> "CAST" ...
>>> "EXTRACT" ...
>>> "POSITION" ...
>>> "CONVERT" ...
>>> "TRANSLATE" ...
>>> "OVERLAY" ...
>>> "FLOOR" ...
>>> "CEIL" ...
>>> "CEILING" ...
>>> "SUBSTRING" ...
>>> "TRIM" ...
>>> "CLASSIFIER" ...
>>> "MATCH_NUMBER" ...
>>> "RUNNING" ...
>>> "PREV" ...
>>> "NEXT" ...
>>> "JSON_EXISTS" ...
>>> "JSON_VALUE" ...
>>> "JSON_QUERY" ...
>>> "JSON_OBJECT" ...
>>> "JSON_OBJECTAGG" ...
>>> "JSON_ARRAY" ...
>>> "JSON_ARRAYAGG" ...
>>> <LBRACE_FN> ...
>>> "MULTISET" ...
>>> "ARRAY" ...
>>> "MAP" ...
>>> "PERIOD" ...
>>> "SPECIFIC" ...
>>> <IDENTIFIER> ...
>>> <QUOTED_IDENTIFIER> ...
>>> <BACK_QUOTED_IDENTIFIER> ...
>>> <BRACKET_QUOTED_IDENTIFIER> ...
>>> <UNICODE_QUOTED_IDENTIFIER> ...
>>> "ABS" ...
>>> "AVG" ...
>>> "CARDINALITY" ...
>>> "CHAR_LENGTH" ...
>>> "CHARACTER_LENGTH" ...
>>> "COALESCE" ...
>>> "COLLECT" ...
>>> "COVAR_POP" ...
>>> "COVAR_SAMP" ...
>>> "CUME_DIST" ...
>>> "COUNT" ...
>>> "CURRENT_DATE" ...
>>> "CURRENT_TIME" ...
>>> "CURRENT_TIMESTAMP" ...
>>> "DENSE_RANK" ...
>>> "ELEMENT" ...
>>> "EXP" ...
>>> "FIRST_VALUE" ...
>>> "FUSION" ...
>>> "GROUPING" ...
>>> "HOUR" ...
>>> "LAG" ...
>>> "LEAD" ...
>>> "LEFT" ...
>>> "LAST_VALUE" ...
>>> "LN" ...
>>> "LOCALTIME" ...
>>> "LOCALTIMESTAMP" ...
>>> "LOWER" ...
>>> "MAX" ...
>>> "MIN" ...
>>> "MINUTE" ...
>>> "MOD" ...
>>> "MONTH" ...
>>> "NTH_VALUE" ...
>>> "NTILE" ...
>>> "NULLIF" ...
>>> "OCTET_LENGTH" ...
>>> "PERCENT_RANK" ...
>>> "POWER" ...
>>> "RANK" ...
>>> "REGR_COUNT" ...
>>> "REGR_SXX" ...
>>> "REGR_SYY" ...
>>> "RIGHT" ...
>>> "ROW_NUMBER" ...
>>> "SECOND" ...
>>> "SQRT" ...
>>> "STDDEV_POP" ...
>>> "STDDEV_SAMP" ...
>>> "SUM" ...
>>> "UPPER" ...
>>> "TRUNCATE" ...
>>> "USER" ...
>>> "VAR_POP" ...
>>> "VAR_SAMP" ...
>>> "YEAR" ...
>>> "CURRENT_CATALOG" ...
>>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>> "CURRENT_PATH" ...
>>> "CURRENT_ROLE" ...
>>> "CURRENT_SCHEMA" ...
>>> "CURRENT_USER" ...
>>> "SESSION_USER" ...
>>> "SYSTEM_USER" ...
>>> "NEW" ...
>>> "CASE" ...
>>> "CURRENT" ...
>>>
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>> at
>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>>
>>> query:
>>>
>>>
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_behavior (
>>> | uid VARCHAR,
>>> | phoneType VARCHAR,
>>> | clickCount INT,
>>> | proctime AS PROCTIME(),
>>> | `time` TIMESTAMP(3)
>>> |) WITH (
>>> | 'connector.type' = 'kafka',
>>> | 'connector.version' = 'universal',
>>> | 'connector.topic' = 'user_behavior',
>>> | 'connector.startup-mode' = 'earliest-offset',
>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>> | 'update-mode' = 'append',
>>> | 'format.type' = 'json',
>>> | 'format.derive-schema' = 'true'
>>> |)
>>> |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |insert into user_cnt
>>> |SELECT
>>> | cast(b.`time` as string), u.age
>>> |FROM
>>> | user_behavior AS b
>>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>>> | ON b.uid = u.uid
>>> |
>>> |""".stripMargin)
>>>
>>>
>>>
>>>
>>>
>>>
>>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>>> 放在select 后面也不行。
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>>>> 你写反了,是proctime AS PROCTIME()。
>>>> 计算列跟普通query里面的AS是反着的。
>>>>
>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>>>>
>>>>> flink 1.10.0:
>>>>> 在create table中,加PROCTIME() AS proctime字段报错
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>>>>> Hi,
>>>>>>
>>>>>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>>>>> 可以参考下[1]
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>>>>>
>>>>>> Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>>>>>
>>>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>>>>
>>>>>>> SLF4J: Found binding in
>>>>>>>
>>>>>
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>
>>>>>>> SLF4J: Found binding in
>>>>>>>
>>>>>
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>>>>
>>>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>>>>> explanation.
>>>>>>>
>>>>>>> SLF4J: Actual binding is of type
>>>>>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>>>>>
>>>>>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>>>>>> configuration: logging only errors to the console.
>>>>>>>
>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>> Cannot generate a valid execution plan for the given query:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>>>>>> fields=[time, sum_age])
>>>>>>>
>>>>>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>>>>>
>>>>>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>>>>>
>>>>>>> :- FlinkLogicalCalc(select=[uid, time])
>>>>>>>
>>>>>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, user_behavior, source: [KafkaTableSource(uid,
>>>>> phoneType,
>>>>>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>>>>>
>>>>>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>>>>>
>>>>>>> +- FlinkLogicalCalc(select=[uid, age])
>>>>>>>
>>>>>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>>>>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>>> sex,
>>>>>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>>> left
>>>>>>> table's proctime field, doesn't support 'PROCTIME()'
>>>>>>>
>>>>>>> Please check the documentation for the set of currently supported SQL
>>>>>>> features.
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>
>>>>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>>>>>
>>>>>>> at
>>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>>>>>
>>>>>>> Caused by: org.apache.flink.table.api.TableException: Temporal table
>>>>> join
>>>>>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>>>>>> field, doesn't support 'PROCTIME()'
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>>>>>
>>>>>>> at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>>>>>
>>>>>>> ... 20 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> query:
>>>>>>>
>>>>>>>
>>>>>>> val streamExecutionEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val blinkEnvSettings =
>>>>>>>
>>>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>> val streamTableEnv =
>>>>>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>>>>>
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |CREATE TABLE user_behavior (
>>>>>>> | uid VARCHAR,
>>>>>>> | phoneType VARCHAR,
>>>>>>> | clickCount INT,
>>>>>>> | `time` TIMESTAMP(3)
>>>>>>> |) WITH (
>>>>>>> | 'connector.type' = 'kafka',
>>>>>>> | 'connector.version' = 'universal',
>>>>>>> | 'connector.topic' = 'user_behavior',
>>>>>>> | 'connector.startup-mode' = 'earliest-offset',
>>>>>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>> | 'connector.properties.0.value' =
>>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>> | 'connector.properties.1.value' =
>>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>>> | 'update-mode' = 'append',
>>>>>>> | 'format.type' = 'json',
>>>>>>> | 'format.derive-schema' = 'true'
>>>>>>> |)
>>>>>>> |""".stripMargin)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |CREATE TABLE user_cnt (
>>>>>>> | `time` VARCHAR,
>>>>>>> | sum_age INT
>>>>>>> |) WITH (
>>>>>>> | 'connector.type' = 'jdbc',
>>>>>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>>>>>> | 'connector.table' = 'user_cnt',
>>>>>>> | 'connector.username' = 'root',
>>>>>>> | 'connector.password' = '123456',
>>>>>>> | 'connector.write.flush.max-rows' = '1'
>>>>>>> |)
>>>>>>> |""".stripMargin)
>>>>>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>>>>>> Array("uid", "sex", "age", "created_time"),
>>>>>>> Array(),
>>>>>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>>>>>> streamTableEnv.registerTableSource("users", userTableSource)
>>>>>>> streamTableEnv.sqlUpdate(
>>>>>>> """
>>>>>>> |
>>>>>>> |insert into user_cnt
>>>>>>> |SELECT
>>>>>>> | cast(b.`time` as string), u.age
>>>>>>> |FROM
>>>>>>> | user_behavior AS b
>>>>>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>>>>>> | ON b.uid = u.uid
>>>>>>> |
>>>>>>> |""".stripMargin)
>>>>>>> streamTableEnv.execute("Temporal table join")
>>>>>
>>>
Re:Re: Re: Re: flink sql Temporal table join failed
Posted by Zhou Zach <wa...@163.com>.
是的,1.10.0版本
在 2020-06-12 16:28:15,"Benchao Li" <li...@apache.org> 写道:
>看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
>
>> 还是不行,
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>> Was expecting one of:
>> "CURSOR" ...
>> "EXISTS" ...
>> "NOT" ...
>> "ROW" ...
>> "(" ...
>> "+" ...
>> "-" ...
>> <UNSIGNED_INTEGER_LITERAL> ...
>> <DECIMAL_NUMERIC_LITERAL> ...
>> <APPROX_NUMERIC_LITERAL> ...
>> <BINARY_STRING_LITERAL> ...
>> <PREFIXED_STRING_LITERAL> ...
>> <QUOTED_STRING> ...
>> <UNICODE_STRING_LITERAL> ...
>> "TRUE" ...
>> "FALSE" ...
>> "UNKNOWN" ...
>> "NULL" ...
>> <LBRACE_D> ...
>> <LBRACE_T> ...
>> <LBRACE_TS> ...
>> "DATE" ...
>> "TIME" <QUOTED_STRING> ...
>> "TIMESTAMP" ...
>> "INTERVAL" ...
>> "?" ...
>> "CAST" ...
>> "EXTRACT" ...
>> "POSITION" ...
>> "CONVERT" ...
>> "TRANSLATE" ...
>> "OVERLAY" ...
>> "FLOOR" ...
>> "CEIL" ...
>> "CEILING" ...
>> "SUBSTRING" ...
>> "TRIM" ...
>> "CLASSIFIER" ...
>> "MATCH_NUMBER" ...
>> "RUNNING" ...
>> "PREV" ...
>> "NEXT" ...
>> "JSON_EXISTS" ...
>> "JSON_VALUE" ...
>> "JSON_QUERY" ...
>> "JSON_OBJECT" ...
>> "JSON_OBJECTAGG" ...
>> "JSON_ARRAY" ...
>> "JSON_ARRAYAGG" ...
>> <LBRACE_FN> ...
>> "MULTISET" ...
>> "ARRAY" ...
>> "MAP" ...
>> "PERIOD" ...
>> "SPECIFIC" ...
>> <IDENTIFIER> ...
>> <QUOTED_IDENTIFIER> ...
>> <BACK_QUOTED_IDENTIFIER> ...
>> <BRACKET_QUOTED_IDENTIFIER> ...
>> <UNICODE_QUOTED_IDENTIFIER> ...
>> "ABS" ...
>> "AVG" ...
>> "CARDINALITY" ...
>> "CHAR_LENGTH" ...
>> "CHARACTER_LENGTH" ...
>> "COALESCE" ...
>> "COLLECT" ...
>> "COVAR_POP" ...
>> "COVAR_SAMP" ...
>> "CUME_DIST" ...
>> "COUNT" ...
>> "CURRENT_DATE" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "DENSE_RANK" ...
>> "ELEMENT" ...
>> "EXP" ...
>> "FIRST_VALUE" ...
>> "FUSION" ...
>> "GROUPING" ...
>> "HOUR" ...
>> "LAG" ...
>> "LEAD" ...
>> "LEFT" ...
>> "LAST_VALUE" ...
>> "LN" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "LOWER" ...
>> "MAX" ...
>> "MIN" ...
>> "MINUTE" ...
>> "MOD" ...
>> "MONTH" ...
>> "NTH_VALUE" ...
>> "NTILE" ...
>> "NULLIF" ...
>> "OCTET_LENGTH" ...
>> "PERCENT_RANK" ...
>> "POWER" ...
>> "RANK" ...
>> "REGR_COUNT" ...
>> "REGR_SXX" ...
>> "REGR_SYY" ...
>> "RIGHT" ...
>> "ROW_NUMBER" ...
>> "SECOND" ...
>> "SQRT" ...
>> "STDDEV_POP" ...
>> "STDDEV_SAMP" ...
>> "SUM" ...
>> "UPPER" ...
>> "TRUNCATE" ...
>> "USER" ...
>> "VAR_POP" ...
>> "VAR_SAMP" ...
>> "YEAR" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_USER" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "NEW" ...
>> "CASE" ...
>> "CURRENT" ...
>>
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>> at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>>
>> query:
>>
>>
>> streamTableEnv.sqlUpdate(
>> """
>> |
>> |CREATE TABLE user_behavior (
>> | uid VARCHAR,
>> | phoneType VARCHAR,
>> | clickCount INT,
>> | proctime AS PROCTIME(),
>> | `time` TIMESTAMP(3)
>> |) WITH (
>> | 'connector.type' = 'kafka',
>> | 'connector.version' = 'universal',
>> | 'connector.topic' = 'user_behavior',
>> | 'connector.startup-mode' = 'earliest-offset',
>> | 'connector.properties.0.key' = 'zookeeper.connect',
>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>> | 'connector.properties.1.key' = 'bootstrap.servers',
>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>> | 'update-mode' = 'append',
>> | 'format.type' = 'json',
>> | 'format.derive-schema' = 'true'
>> |)
>> |""".stripMargin)
>> streamTableEnv.sqlUpdate(
>> """
>> |
>> |insert into user_cnt
>> |SELECT
>> | cast(b.`time` as string), u.age
>> |FROM
>> | user_behavior AS b
>> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>> | ON b.uid = u.uid
>> |
>> |""".stripMargin)
>>
>>
>>
>>
>>
>>
>> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
>> 放在select 后面也不行。
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>> >你写反了,是proctime AS PROCTIME()。
>> >计算列跟普通query里面的AS是反着的。
>> >
>> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>> >
>> >> flink 1.10.0:
>> >> 在create table中,加PROCTIME() AS proctime字段报错
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> >> >Hi,
>> >> >
>> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> >> >可以参考下[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> >> >
>> >> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>> >> >
>> >> >> SLF4J: Class path contains multiple SLF4J bindings.
>> >> >>
>> >> >> SLF4J: Found binding in
>> >> >>
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >> >>
>> >> >> SLF4J: Found binding in
>> >> >>
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >> >>
>> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> >> >> explanation.
>> >> >>
>> >> >> SLF4J: Actual binding is of type
>> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> >> >>
>> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default
>> >> >> configuration: logging only errors to the console.
>> >> >>
>> >> >> Exception in thread "main" org.apache.flink.table.api.TableException:
>> >> >> Cannot generate a valid execution plan for the given query:
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> >> >> fields=[time, sum_age])
>> >> >>
>> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>> >> >>
>> >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>> >> >>
>> >> >> :- FlinkLogicalCalc(select=[uid, time])
>> >> >>
>> >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> >> default_database, user_behavior, source: [KafkaTableSource(uid,
>> >> phoneType,
>> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>> >> >>
>> >> >> +- FlinkLogicalSnapshot(period=[$cor0.time])
>> >> >>
>> >> >> +- FlinkLogicalCalc(select=[uid, age])
>> >> >>
>> >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
>> sex,
>> >> >> age, created_time)]]], fields=[uid, sex, age, created_time])
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
>> left
>> >> >> table's proctime field, doesn't support 'PROCTIME()'
>> >> >>
>> >> >> Please check the documentation for the set of currently supported SQL
>> >> >> features.
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >> >>
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >>
>> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >>
>> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >>
>> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>> >> >>
>> >> >> at
>> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>> >> >>
>> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
>> >> join
>> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> >> >> field, doesn't support 'PROCTIME()'
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>> >> >>
>> >> >> at
>> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>> >> >>
>> >> >> at
>> >> >>
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>> >> >>
>> >> >> ... 20 more
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> query:
>> >> >>
>> >> >>
>> >> >> val streamExecutionEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment
>> >> >> val blinkEnvSettings =
>> >> >>
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >> >> val streamTableEnv =
>> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>> >> >>
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >> |
>> >> >> |CREATE TABLE user_behavior (
>> >> >> | uid VARCHAR,
>> >> >> | phoneType VARCHAR,
>> >> >> | clickCount INT,
>> >> >> | `time` TIMESTAMP(3)
>> >> >> |) WITH (
>> >> >> | 'connector.type' = 'kafka',
>> >> >> | 'connector.version' = 'universal',
>> >> >> | 'connector.topic' = 'user_behavior',
>> >> >> | 'connector.startup-mode' = 'earliest-offset',
>> >> >> | 'connector.properties.0.key' = 'zookeeper.connect',
>> >> >> | 'connector.properties.0.value' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >> | 'connector.properties.1.key' = 'bootstrap.servers',
>> >> >> | 'connector.properties.1.value' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >> | 'update-mode' = 'append',
>> >> >> | 'format.type' = 'json',
>> >> >> | 'format.derive-schema' = 'true'
>> >> >> |)
>> >> >> |""".stripMargin)
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >> |
>> >> >> |CREATE TABLE user_cnt (
>> >> >> | `time` VARCHAR,
>> >> >> | sum_age INT
>> >> >> |) WITH (
>> >> >> | 'connector.type' = 'jdbc',
>> >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> >> >> | 'connector.table' = 'user_cnt',
>> >> >> | 'connector.username' = 'root',
>> >> >> | 'connector.password' = '123456',
>> >> >> | 'connector.write.flush.max-rows' = '1'
>> >> >> |)
>> >> >> |""".stripMargin)
>> >> >> val userTableSource = new MysqlAsyncLookupTableSource(
>> >> >> Array("uid", "sex", "age", "created_time"),
>> >> >> Array(),
>> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> >> >> streamTableEnv.registerTableSource("users", userTableSource)
>> >> >> streamTableEnv.sqlUpdate(
>> >> >> """
>> >> >> |
>> >> >> |insert into user_cnt
>> >> >> |SELECT
>> >> >> | cast(b.`time` as string), u.age
>> >> >> |FROM
>> >> >> | user_behavior AS b
>> >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> >> >> | ON b.uid = u.uid
>> >> >> |
>> >> >> |""".stripMargin)
>> >> >> streamTableEnv.execute("Temporal table join")
>> >>
>>
Re: Re: Re: flink sql Temporal table join failed
Posted by Benchao Li <li...@apache.org>.
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午3:47写道:
> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
> "CURSOR" ...
> "EXISTS" ...
> "NOT" ...
> "ROW" ...
> "(" ...
> "+" ...
> "-" ...
> <UNSIGNED_INTEGER_LITERAL> ...
> <DECIMAL_NUMERIC_LITERAL> ...
> <APPROX_NUMERIC_LITERAL> ...
> <BINARY_STRING_LITERAL> ...
> <PREFIXED_STRING_LITERAL> ...
> <QUOTED_STRING> ...
> <UNICODE_STRING_LITERAL> ...
> "TRUE" ...
> "FALSE" ...
> "UNKNOWN" ...
> "NULL" ...
> <LBRACE_D> ...
> <LBRACE_T> ...
> <LBRACE_TS> ...
> "DATE" ...
> "TIME" <QUOTED_STRING> ...
> "TIMESTAMP" ...
> "INTERVAL" ...
> "?" ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "NEXT" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
> <LBRACE_FN> ...
> "MULTISET" ...
> "ARRAY" ...
> "MAP" ...
> "PERIOD" ...
> "SPECIFIC" ...
> <IDENTIFIER> ...
> <QUOTED_IDENTIFIER> ...
> <BACK_QUOTED_IDENTIFIER> ...
> <BRACKET_QUOTED_IDENTIFIER> ...
> <UNICODE_QUOTED_IDENTIFIER> ...
> "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "CURRENT_DATE" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "USER" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_USER" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "NEW" ...
> "CASE" ...
> "CURRENT" ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
>
> query:
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_behavior (
> | uid VARCHAR,
> | phoneType VARCHAR,
> | clickCount INT,
> | proctime AS PROCTIME(),
> | `time` TIMESTAMP(3)
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = 'universal',
> | 'connector.topic' = 'user_behavior',
> | 'connector.startup-mode' = 'earliest-offset',
> | 'connector.properties.0.key' = 'zookeeper.connect',
> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
> | 'connector.properties.1.key' = 'bootstrap.servers',
> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
> | 'update-mode' = 'append',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)
> |""".stripMargin)
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into user_cnt
> |SELECT
> | cast(b.`time` as string), u.age
> |FROM
> | user_behavior AS b
> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
> | ON b.uid = u.uid
> |
> |""".stripMargin)
>
>
>
>
>
>
> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
> 放在select 后面也不行。
>
>
>
>
>
>
>
>
> 在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
> >你写反了,是proctime AS PROCTIME()。
> >计算列跟普通query里面的AS是反着的。
> >
> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
> >
> >> flink 1.10.0:
> >> 在create table中,加PROCTIME() AS proctime字段报错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
> >> >Hi,
> >> >
> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >> >可以参考下[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >> >
> >> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
> >> >
> >> >> SLF4J: Class path contains multiple SLF4J bindings.
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> >> explanation.
> >> >>
> >> >> SLF4J: Actual binding is of type
> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >> >>
> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> >> configuration: logging only errors to the console.
> >> >>
> >> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> >> Cannot generate a valid execution plan for the given query:
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> >> fields=[time, sum_age])
> >> >>
> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >> >>
> >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >> >>
> >> >> :- FlinkLogicalCalc(select=[uid, time])
> >> >>
> >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> >> phoneType,
> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >> >>
> >> >> +- FlinkLogicalSnapshot(period=[$cor0.time])
> >> >>
> >> >> +- FlinkLogicalCalc(select=[uid, age])
> >> >>
> >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
> sex,
> >> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
> left
> >> >> table's proctime field, doesn't support 'PROCTIME()'
> >> >>
> >> >> Please check the documentation for the set of currently supported SQL
> >> >> features.
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >>
> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >>
> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >>
> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >> >>
> >> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >> >>
> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> >> join
> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> >> >> field, doesn't support 'PROCTIME()'
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> >> >>
> >> >> at
> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> >> >>
> >> >> ... 20 more
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> query:
> >> >>
> >> >>
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >> val blinkEnvSettings =
> >> >>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> >> val streamTableEnv =
> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
> >> >>
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >> |
> >> >> |CREATE TABLE user_behavior (
> >> >> | uid VARCHAR,
> >> >> | phoneType VARCHAR,
> >> >> | clickCount INT,
> >> >> | `time` TIMESTAMP(3)
> >> >> |) WITH (
> >> >> | 'connector.type' = 'kafka',
> >> >> | 'connector.version' = 'universal',
> >> >> | 'connector.topic' = 'user_behavior',
> >> >> | 'connector.startup-mode' = 'earliest-offset',
> >> >> | 'connector.properties.0.key' = 'zookeeper.connect',
> >> >> | 'connector.properties.0.value' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> | 'connector.properties.1.key' = 'bootstrap.servers',
> >> >> | 'connector.properties.1.value' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> | 'update-mode' = 'append',
> >> >> | 'format.type' = 'json',
> >> >> | 'format.derive-schema' = 'true'
> >> >> |)
> >> >> |""".stripMargin)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >> |
> >> >> |CREATE TABLE user_cnt (
> >> >> | `time` VARCHAR,
> >> >> | sum_age INT
> >> >> |) WITH (
> >> >> | 'connector.type' = 'jdbc',
> >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> >> >> | 'connector.table' = 'user_cnt',
> >> >> | 'connector.username' = 'root',
> >> >> | 'connector.password' = '123456',
> >> >> | 'connector.write.flush.max-rows' = '1'
> >> >> |)
> >> >> |""".stripMargin)
> >> >> val userTableSource = new MysqlAsyncLookupTableSource(
> >> >> Array("uid", "sex", "age", "created_time"),
> >> >> Array(),
> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> >> >> streamTableEnv.registerTableSource("users", userTableSource)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >> |
> >> >> |insert into user_cnt
> >> >> |SELECT
> >> >> | cast(b.`time` as string), u.age
> >> >> |FROM
> >> >> | user_behavior AS b
> >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> >> >> | ON b.uid = u.uid
> >> >> |
> >> >> |""".stripMargin)
> >> >> streamTableEnv.execute("Temporal table join")
> >>
>
Re:Re: Re: flink sql Temporal table join failed
Posted by Zhou Zach <wa...@163.com>.
还是不行,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time FROM" at line 1, column 44.
Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
<UNSIGNED_INTEGER_LITERAL> ...
<DECIMAL_NUMERIC_LITERAL> ...
<APPROX_NUMERIC_LITERAL> ...
<BINARY_STRING_LITERAL> ...
<PREFIXED_STRING_LITERAL> ...
<QUOTED_STRING> ...
<UNICODE_STRING_LITERAL> ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
<LBRACE_D> ...
<LBRACE_T> ...
<LBRACE_TS> ...
"DATE" ...
"TIME" <QUOTED_STRING> ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
<LBRACE_FN> ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
<IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<BRACKET_QUOTED_IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
query:
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_behavior (
| uid VARCHAR,
| phoneType VARCHAR,
| clickCount INT,
| proctime AS PROCTIME(),
| `time` TIMESTAMP(3)
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| 'connector.topic' = 'user_behavior',
| 'connector.startup-mode' = 'earliest-offset',
| 'connector.properties.0.key' = 'zookeeper.connect',
| 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin)
streamTableEnv.sqlUpdate(
"""
|
|insert into user_cnt
|SELECT
| cast(b.`time` as string), u.age
|FROM
| user_behavior AS b
| JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
| ON b.uid = u.uid
|
|""".stripMargin)
不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() 放在select 后面也不行。
在 2020-06-12 15:29:49,"Benchao Li" <li...@apache.org> 写道:
>你写反了,是proctime AS PROCTIME()。
>计算列跟普通query里面的AS是反着的。
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
>
>> flink 1.10.0:
>> 在create table中,加PROCTIME() AS proctime字段报错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>> >Hi,
>> >
>> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> >可以参考下[1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> >
>> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>> >
>> >> SLF4J: Class path contains multiple SLF4J bindings.
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> >> explanation.
>> >>
>> >> SLF4J: Actual binding is of type
>> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> >>
>> >> ERROR StatusLogger No log4j2 configuration file found. Using default
>> >> configuration: logging only errors to the console.
>> >>
>> >> Exception in thread "main" org.apache.flink.table.api.TableException:
>> >> Cannot generate a valid execution plan for the given query:
>> >>
>> >>
>> >>
>> >>
>> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> >> fields=[time, sum_age])
>> >>
>> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>> >>
>> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>> >>
>> >> :- FlinkLogicalCalc(select=[uid, time])
>> >>
>> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, user_behavior, source: [KafkaTableSource(uid,
>> phoneType,
>> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>> >>
>> >> +- FlinkLogicalSnapshot(period=[$cor0.time])
>> >>
>> >> +- FlinkLogicalCalc(select=[uid, age])
>> >>
>> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> >> age, created_time)]]], fields=[uid, sex, age, created_time])
>> >>
>> >>
>> >>
>> >>
>> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> >> table's proctime field, doesn't support 'PROCTIME()'
>> >>
>> >> Please check the documentation for the set of currently supported SQL
>> >> features.
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>
>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>
>> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>
>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>> >>
>> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
>> join
>> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> >> field, doesn't support 'PROCTIME()'
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>> >>
>> >> at
>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>> >>
>> >> ... 20 more
>> >>
>> >>
>> >>
>> >>
>> >> query:
>> >>
>> >>
>> >> val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >> val blinkEnvSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >> val streamTableEnv =
>> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>> >>
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |CREATE TABLE user_behavior (
>> >> | uid VARCHAR,
>> >> | phoneType VARCHAR,
>> >> | clickCount INT,
>> >> | `time` TIMESTAMP(3)
>> >> |) WITH (
>> >> | 'connector.type' = 'kafka',
>> >> | 'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user_behavior',
>> >> | 'connector.startup-mode' = 'earliest-offset',
>> >> | 'connector.properties.0.key' = 'zookeeper.connect',
>> >> | 'connector.properties.0.value' =
>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> | 'connector.properties.1.key' = 'bootstrap.servers',
>> >> | 'connector.properties.1.value' =
>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> | 'update-mode' = 'append',
>> >> | 'format.type' = 'json',
>> >> | 'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |CREATE TABLE user_cnt (
>> >> | `time` VARCHAR,
>> >> | sum_age INT
>> >> |) WITH (
>> >> | 'connector.type' = 'jdbc',
>> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> >> | 'connector.table' = 'user_cnt',
>> >> | 'connector.username' = 'root',
>> >> | 'connector.password' = '123456',
>> >> | 'connector.write.flush.max-rows' = '1'
>> >> |)
>> >> |""".stripMargin)
>> >> val userTableSource = new MysqlAsyncLookupTableSource(
>> >> Array("uid", "sex", "age", "created_time"),
>> >> Array(),
>> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> >> streamTableEnv.registerTableSource("users", userTableSource)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |insert into user_cnt
>> >> |SELECT
>> >> | cast(b.`time` as string), u.age
>> >> |FROM
>> >> | user_behavior AS b
>> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> >> | ON b.uid = u.uid
>> >> |
>> >> |""".stripMargin)
>> >> streamTableEnv.execute("Temporal table join")
>>
Re: Re: flink sql Temporal table join failed
Posted by Benchao Li <li...@apache.org>.
你写反了,是proctime AS PROCTIME()。
计算列跟普通query里面的AS是反着的。
Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午2:24写道:
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
> >Hi,
> >
> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >可以参考下[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >
> >Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
> >
> >> SLF4J: Class path contains multiple SLF4J bindings.
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >>
> >> SLF4J: Actual binding is of type
> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >>
> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> configuration: logging only errors to the console.
> >>
> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> Cannot generate a valid execution plan for the given query:
> >>
> >>
> >>
> >>
> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> fields=[time, sum_age])
> >>
> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >>
> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >>
> >> :- FlinkLogicalCalc(select=[uid, time])
> >>
> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> phoneType,
> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >>
> >> +- FlinkLogicalSnapshot(period=[$cor0.time])
> >>
> >> +- FlinkLogicalCalc(select=[uid, age])
> >>
> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >>
> >>
> >>
> >>
> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> >> table's proctime field, doesn't support 'PROCTIME()'
> >>
> >> Please check the documentation for the set of currently supported SQL
> >> features.
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>
> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>
> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>
> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >>
> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >>
> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> join
> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> >> field, doesn't support 'PROCTIME()'
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >>
> >> at
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >>
> >> at
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >>
> >> at
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> >>
> >> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> >>
> >> ... 20 more
> >>
> >>
> >>
> >>
> >> query:
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> val streamTableEnv =
> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
> >>
> >> streamTableEnv.sqlUpdate(
> >> """
> >> |
> >> |CREATE TABLE user_behavior (
> >> | uid VARCHAR,
> >> | phoneType VARCHAR,
> >> | clickCount INT,
> >> | `time` TIMESTAMP(3)
> >> |) WITH (
> >> | 'connector.type' = 'kafka',
> >> | 'connector.version' = 'universal',
> >> | 'connector.topic' = 'user_behavior',
> >> | 'connector.startup-mode' = 'earliest-offset',
> >> | 'connector.properties.0.key' = 'zookeeper.connect',
> >> | 'connector.properties.0.value' =
> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> | 'connector.properties.1.key' = 'bootstrap.servers',
> >> | 'connector.properties.1.value' =
> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> | 'update-mode' = 'append',
> >> | 'format.type' = 'json',
> >> | 'format.derive-schema' = 'true'
> >> |)
> >> |""".stripMargin)
> >> streamTableEnv.sqlUpdate(
> >> """
> >> |
> >> |CREATE TABLE user_cnt (
> >> | `time` VARCHAR,
> >> | sum_age INT
> >> |) WITH (
> >> | 'connector.type' = 'jdbc',
> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> >> | 'connector.table' = 'user_cnt',
> >> | 'connector.username' = 'root',
> >> | 'connector.password' = '123456',
> >> | 'connector.write.flush.max-rows' = '1'
> >> |)
> >> |""".stripMargin)
> >> val userTableSource = new MysqlAsyncLookupTableSource(
> >> Array("uid", "sex", "age", "created_time"),
> >> Array(),
> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> >> streamTableEnv.registerTableSource("users", userTableSource)
> >> streamTableEnv.sqlUpdate(
> >> """
> >> |
> >> |insert into user_cnt
> >> |SELECT
> >> | cast(b.`time` as string), u.age
> >> |FROM
> >> | user_behavior AS b
> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> >> | ON b.uid = u.uid
> >> |
> >> |""".stripMargin)
> >> streamTableEnv.execute("Temporal table join")
>
Re:Re:Re: flink sql Temporal table join failed
Posted by 陈邵瑾 <15...@163.com>.
参考一下sql相关time的文档,根据描述使用姿势有问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
At 2020-06-12 14:24:07, "Zhou Zach" <wa...@163.com> wrote:
>flink 1.10.0:
>在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>>Hi,
>>
>>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>可以参考下[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>
>>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>>
>>>
>>>
>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>>
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>
>>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>
>>> :- FlinkLogicalCalc(select=[uid, time])
>>>
>>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>
>>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>
>>> +- FlinkLogicalCalc(select=[uid, age])
>>>
>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>
>>>
>>>
>>>
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>>
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>>
>>> at
>>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>>
>>> at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>>
>>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>>
>>> ... 20 more
>>>
>>>
>>>
>>>
>>> query:
>>>
>>>
>>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val blinkEnvSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val streamTableEnv =
>>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>>
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_behavior (
>>> | uid VARCHAR,
>>> | phoneType VARCHAR,
>>> | clickCount INT,
>>> | `time` TIMESTAMP(3)
>>> |) WITH (
>>> | 'connector.type' = 'kafka',
>>> | 'connector.version' = 'universal',
>>> | 'connector.topic' = 'user_behavior',
>>> | 'connector.startup-mode' = 'earliest-offset',
>>> | 'connector.properties.0.key' = 'zookeeper.connect',
>>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>> | 'connector.properties.1.key' = 'bootstrap.servers',
>>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>> | 'update-mode' = 'append',
>>> | 'format.type' = 'json',
>>> | 'format.derive-schema' = 'true'
>>> |)
>>> |""".stripMargin)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |CREATE TABLE user_cnt (
>>> | `time` VARCHAR,
>>> | sum_age INT
>>> |) WITH (
>>> | 'connector.type' = 'jdbc',
>>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>> | 'connector.table' = 'user_cnt',
>>> | 'connector.username' = 'root',
>>> | 'connector.password' = '123456',
>>> | 'connector.write.flush.max-rows' = '1'
>>> |)
>>> |""".stripMargin)
>>> val userTableSource = new MysqlAsyncLookupTableSource(
>>> Array("uid", "sex", "age", "created_time"),
>>> Array(),
>>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>>> streamTableEnv.registerTableSource("users", userTableSource)
>>> streamTableEnv.sqlUpdate(
>>> """
>>> |
>>> |insert into user_cnt
>>> |SELECT
>>> | cast(b.`time` as string), u.age
>>> |FROM
>>> | user_behavior AS b
>>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>> | ON b.uid = u.uid
>>> |
>>> |""".stripMargin)
>>> streamTableEnv.execute("Temporal table join")
Re:Re: flink sql Temporal table join failed
Posted by Zhou Zach <wa...@163.com>.
flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错
在 2020-06-12 14:08:11,"Benchao Li" <li...@apache.org> 写道:
>Hi,
>
>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>可以参考下[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
>Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Cannot generate a valid execution plan for the given query:
>>
>>
>>
>>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> fields=[time, sum_age])
>>
>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>
>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>
>> :- FlinkLogicalCalc(select=[uid, time])
>>
>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>
>> +- FlinkLogicalSnapshot(period=[$cor0.time])
>>
>> +- FlinkLogicalCalc(select=[uid, age])
>>
>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>
>>
>>
>>
>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> table's proctime field, doesn't support 'PROCTIME()'
>>
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>
>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> field, doesn't support 'PROCTIME()'
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>
>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>
>> ... 20 more
>>
>>
>>
>>
>> query:
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>
>> streamTableEnv.sqlUpdate(
>> """
>> |
>> |CREATE TABLE user_behavior (
>> | uid VARCHAR,
>> | phoneType VARCHAR,
>> | clickCount INT,
>> | `time` TIMESTAMP(3)
>> |) WITH (
>> | 'connector.type' = 'kafka',
>> | 'connector.version' = 'universal',
>> | 'connector.topic' = 'user_behavior',
>> | 'connector.startup-mode' = 'earliest-offset',
>> | 'connector.properties.0.key' = 'zookeeper.connect',
>> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>> | 'connector.properties.1.key' = 'bootstrap.servers',
>> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>> | 'update-mode' = 'append',
>> | 'format.type' = 'json',
>> | 'format.derive-schema' = 'true'
>> |)
>> |""".stripMargin)
>> streamTableEnv.sqlUpdate(
>> """
>> |
>> |CREATE TABLE user_cnt (
>> | `time` VARCHAR,
>> | sum_age INT
>> |) WITH (
>> | 'connector.type' = 'jdbc',
>> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> | 'connector.table' = 'user_cnt',
>> | 'connector.username' = 'root',
>> | 'connector.password' = '123456',
>> | 'connector.write.flush.max-rows' = '1'
>> |)
>> |""".stripMargin)
>> val userTableSource = new MysqlAsyncLookupTableSource(
>> Array("uid", "sex", "age", "created_time"),
>> Array(),
>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> streamTableEnv.registerTableSource("users", userTableSource)
>> streamTableEnv.sqlUpdate(
>> """
>> |
>> |insert into user_cnt
>> |SELECT
>> | cast(b.`time` as string), u.age
>> |FROM
>> | user_behavior AS b
>> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> | ON b.uid = u.uid
>> |
>> |""".stripMargin)
>> streamTableEnv.execute("Temporal table join")
Re: flink sql Temporal table join failed
Posted by Benchao Li <li...@apache.org>.
Hi,
Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
可以参考下[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
Zhou Zach <wa...@163.com> 于2020年6月12日周五 下午1:33写道:
> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
>
>
>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> fields=[time, sum_age])
>
> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>
> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>
> :- FlinkLogicalCalc(select=[uid, time])
>
> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>
> +- FlinkLogicalSnapshot(period=[$cor0.time])
>
> +- FlinkLogicalCalc(select=[uid, age])
>
> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> age, created_time)]]], fields=[uid, sex, age, created_time])
>
>
>
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> table's proctime field, doesn't support 'PROCTIME()'
>
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> field, doesn't support 'PROCTIME()'
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>
> ... 20 more
>
>
>
>
> query:
>
>
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_behavior (
> | uid VARCHAR,
> | phoneType VARCHAR,
> | clickCount INT,
> | `time` TIMESTAMP(3)
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = 'universal',
> | 'connector.topic' = 'user_behavior',
> | 'connector.startup-mode' = 'earliest-offset',
> | 'connector.properties.0.key' = 'zookeeper.connect',
> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
> | 'connector.properties.1.key' = 'bootstrap.servers',
> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
> | 'update-mode' = 'append',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)
> |""".stripMargin)
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_cnt (
> | `time` VARCHAR,
> | sum_age INT
> |) WITH (
> | 'connector.type' = 'jdbc',
> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> | 'connector.table' = 'user_cnt',
> | 'connector.username' = 'root',
> | 'connector.password' = '123456',
> | 'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
> val userTableSource = new MysqlAsyncLookupTableSource(
> Array("uid", "sex", "age", "created_time"),
> Array(),
> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> streamTableEnv.registerTableSource("users", userTableSource)
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into user_cnt
> |SELECT
> | cast(b.`time` as string), u.age
> |FROM
> | user_behavior AS b
> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> | ON b.uid = u.uid
> |
> |""".stripMargin)
> streamTableEnv.execute("Temporal table join")