You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by liangjinghong <li...@huawei.com.INVALID> on 2022/02/05 09:26:19 UTC

请教三张以上时态表关联,加where条件后报错 mismatched type $6 TIMESTAMP(3)的问题

你好,因业务需求,flink 1.13,MySQL CDC 2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下:
SQL:
insert into sink
select count(1) as machine from tbl_schedule_job as job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
where job.jobStatus in ('RUNNING','INITING','ERROR')
报错:
Exception in thread "main" java.lang.AssertionError: mismatched type $6 TIMESTAMP(3)
         at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710)
         at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688)
         at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
         at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268)
         at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238)
         at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256)
         at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811)
         at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189)
         at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377)
         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
         at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.immutable.Range.foreach(Range.scala:160)
         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
         at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
         at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
         at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
         at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
         at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:168)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1516)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:738)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
         at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)
建表语句:
CREATE TABLE `tbl_schedule_job`
(`jobId` VARCHAR ,
 `appId` VARCHAR ,
 `jobStatus` VARCHAR,
 `lastUpdateTime` TIMESTAMP(3),
  PRIMARY KEY (`jobId`) NOT ENFORCED,
  WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
CREATE TABLE `tbl_schedule_task`
( `jobId` VARCHAR,
  `nodeId` VARCHAR,
  `lastModifiedTime` TIMESTAMP(3),
   PRIMARY KEY (`jobId`) NOT ENFORCED,
   WATERMARK FOR lastModifiedTime AS lastModifiedTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
 CREATE TABLE `tbl_broker_node`
 (`id` VARCHAR ,
 `resourcesSpec` VARCHAR ,
 `region` VARCHAR ,
 `machineStatus` VARCHAR ,
 `poolId` VARCHAR,
 `lastUpdateTime` TIMESTAMP(3),
  PRIMARY KEY (`id`) NOT ENFORCED,
  WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
CREATE table sink(
  machine BIGINT,
  PRIMARY KEY (`machine`) NOT ENFORCED
) with (
  'connector' = 'upsert-kafka',
  'property.version' = 'universal',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sink',
  'topic' = 'sink_test',
  'key.format' = 'json',
  'value.format' = 'json'
);

为了解决以上问题,我曾尝试将where条件写到子查询中,报错情况如下:
SQL:
insert into sink
select count(1) as machine
from (
(select * from tbl_schedule_job job where job.jobStatus in ('RUNNING','INITING','ERROR'))as job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId
join tbl_broker_node FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
on t.nodeId = n.id)
报错:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "as" at line 4, column 90.
Was expecting one of:
    "EXCEPT" ...
    "FETCH" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
   "MINUS" ...
    "UNION" ...
    ")" ...
    "." ...
    "NOT" ...
    "IN" ...
    "<" ...
    "<=" ...
    ">" ...
    ">=" ...
    "=" ...
    "<>" ...
    "!=" ...
    "BETWEEN" ...
    "LIKE" ...
    "SIMILAR" ...
    "+" ...
    "-" ...
    "*" ...
    "/" ...
    "%" ...
    "||" ...
    "AND" ...
    "OR" ...
    "IS" ...
    "MEMBER" ...
    "SUBMULTISET" ...
    "CONTAINS" ...
    "OVERLAPS" ...
    "EQUALS" ...
    "PRECEDES" ...
    "SUCCEEDS" ...
    "IMMEDIATELY" ...
    "MULTISET" ...
    "[" ...
    "FORMAT" ...
    "YEAR" ...
    "YEARS" ...
    "MONTH" ...
    "MONTHS" ...
    "DAY" ...
    "DAYS" ...
    "HOUR" ...
    "HOURS" ...
    "MINUTE" ...
    "MINUTES" ...
    "SECOND" ...
    "SECONDS" ...

         at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
         at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)
         at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)

然而这段SQL可以正常在DBeaver中运行,所以我不知道是哪里格式出了问题?

Re: 请教三张以上时态表关联,加where条件后报错 mismatched type $6 TIMESTAMP(3)的问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

你想要将主表和两张维表进行连续的 event time temporal join 吗?

第一个 SQL 语句是不是不完整,只有一个 join on 却有两张表。

第二个 SQL 语句中,首先定义 view 应该通过 create view 语句,其次 event time temporal join
应该使用左表的 event time。你的语句中,第一个 join 使用的是 FOR SYSTEM_TIME AS OF
job.lastUpdateTime 这是正确的,但第二个 join 使用的是 FOR SYSTEM_TIME AS OF
t.lastModifiedTime 这是错误的。因为 t.lastModifiedTime 来自维表 t,经过第一次 event time
temporal join,这一列的 rowtime 属性已经被去掉了,只留下了 job.lastUpdateTime 的 rowtime
属性,因此第二个 join 应该继续使用 FOR SYSTEM_TIME AS OF job.lastUpdateTime。也就是说,完整的 SQL
语句应该是:

create view job as
select * from tbl_schedule_job job where job.jobStatus in ('RUNNING',
'INITING','ERROR');

insert into sink
select count(1) as machine
from (
select * from job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId
join tbl_broker_node FOR SYSTEM_TIME AS OF job.lastUpdateTime AS n
on t.nodeId = n.id);

liangjinghong <li...@huawei.com.invalid> 于2022年2月5日周六 17:26写道:

> 你好,因业务需求,flink 1.13,MySQL CDC
> 2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下:
> SQL:
> insert into sink
> select count(1) as machine from tbl_schedule_job as job
> join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
> on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
> where job.jobStatus in ('RUNNING','INITING','ERROR')
> 报错:
> Exception in thread "main" java.lang.AssertionError: mismatched type $6
> TIMESTAMP(3)
>          at
> org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710)
>          at
> org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688)
>          at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
>          at
> org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>          at
> org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>          at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
>          at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>          at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268)
>          at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238)
>          at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256)
>          at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811)
>          at
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189)
>          at
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377)
>          at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>          at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>          at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>          at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
>          at
> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>          at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>          at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>          at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>          at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>          at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at scala.collection.immutable.Range.foreach(Range.scala:160)
>          at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>          at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>          at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>          at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>          at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>          at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>          at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>          at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>          at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>          at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
>          at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:168)
>          at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1516)
>          at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:738)
>          at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854)
>          at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
>          at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)
> 建表语句:
> CREATE TABLE `tbl_schedule_job`
> (`jobId` VARCHAR ,
>  `appId` VARCHAR ,
>  `jobStatus` VARCHAR,
>  `lastUpdateTime` TIMESTAMP(3),
>   PRIMARY KEY (`jobId`) NOT ENFORCED,
>   WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
>  )WITH(
>      'connector' = 'mysql-cdc',
>      'hostname' = 'xxx',
>      'port' = '3306',
>      'username' = 'xxx',
>      'password' = 'xxx',
>      'database-name' = 'xxx',
>      'table-name' = 'xxx'
>  );
> CREATE TABLE `tbl_schedule_task`
> ( `jobId` VARCHAR,
>   `nodeId` VARCHAR,
>   `lastModifiedTime` TIMESTAMP(3),
>    PRIMARY KEY (`jobId`) NOT ENFORCED,
>    WATERMARK FOR lastModifiedTime AS lastModifiedTime - INTERVAL '1' MINUTE
>  )WITH(
>      'connector' = 'mysql-cdc',
>      'hostname' = 'xxx',
>      'port' = '3306',
>      'username' = 'xxx',
>      'password' = 'xxx',
>      'database-name' = 'xxx',
>      'table-name' = 'xxx'
>  );
>  CREATE TABLE `tbl_broker_node`
>  (`id` VARCHAR ,
>  `resourcesSpec` VARCHAR ,
>  `region` VARCHAR ,
>  `machineStatus` VARCHAR ,
>  `poolId` VARCHAR,
>  `lastUpdateTime` TIMESTAMP(3),
>   PRIMARY KEY (`id`) NOT ENFORCED,
>   WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
>  )WITH(
>      'connector' = 'mysql-cdc',
>      'hostname' = 'xxx',
>      'port' = '3306',
>      'username' = 'xxxx',
>      'password' = 'xxx',
>      'database-name' = 'xxx',
>      'table-name' = 'xxx'
>  );
> CREATE table sink(
>   machine BIGINT,
>   PRIMARY KEY (`machine`) NOT ENFORCED
> ) with (
>   'connector' = 'upsert-kafka',
>   'property.version' = 'universal',
>   'properties.bootstrap.servers' = '',
>   'properties.group.id' = 'sink',
>   'topic' = 'sink_test',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
>
> 为了解决以上问题,我曾尝试将where条件写到子查询中,报错情况如下:
> SQL:
> insert into sink
> select count(1) as machine
> from (
> (select * from tbl_schedule_job job where job.jobStatus in
> ('RUNNING','INITING','ERROR'))as job
> join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
> on t.jobId = job.jobId
> join tbl_broker_node FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
> on t.nodeId = n.id)
> 报错:
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "as" at line 4, column 90.
> Was expecting one of:
>     "EXCEPT" ...
>     "FETCH" ...
>     "INTERSECT" ...
>     "LIMIT" ...
>     "OFFSET" ...
>     "ORDER" ...
>    "MINUS" ...
>     "UNION" ...
>     ")" ...
>     "." ...
>     "NOT" ...
>     "IN" ...
>     "<" ...
>     "<=" ...
>     ">" ...
>     ">=" ...
>     "=" ...
>     "<>" ...
>     "!=" ...
>     "BETWEEN" ...
>     "LIKE" ...
>     "SIMILAR" ...
>     "+" ...
>     "-" ...
>     "*" ...
>     "/" ...
>     "%" ...
>     "||" ...
>     "AND" ...
>     "OR" ...
>     "IS" ...
>     "MEMBER" ...
>     "SUBMULTISET" ...
>     "CONTAINS" ...
>     "OVERLAPS" ...
>     "EQUALS" ...
>     "PRECEDES" ...
>     "SUCCEEDS" ...
>     "IMMEDIATELY" ...
>     "MULTISET" ...
>     "[" ...
>     "FORMAT" ...
>     "YEAR" ...
>     "YEARS" ...
>     "MONTH" ...
>     "MONTHS" ...
>     "DAY" ...
>     "DAYS" ...
>     "HOUR" ...
>     "HOURS" ...
>     "MINUTE" ...
>     "MINUTES" ...
>     "SECOND" ...
>     "SECONDS" ...
>
>          at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
>          at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
>          at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)
>          at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)
>
> 然而这段SQL可以正常在DBeaver中运行,所以我不知道是哪里格式出了问题?
>