You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2019/12/12 07:33:00 UTC

[jira] [Commented] (FLINK-15200) legacy planner cannot deal Type with precision like DataTypes.TIMESTAMP(3) in TableSourceUtil

    [ https://issues.apache.org/jira/browse/FLINK-15200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994331#comment-16994331 ] 

Dawid Wysakowicz commented on FLINK-15200:
------------------------------------------

Thank you [~Leonard Xu] for opening the issue. It is actually a duplicate. Your change would fix the current exceptions, but I think it is just a temporary solution, as it still goes back and forth between new and old types. I am working on a solution that would properly work with new type system.

> legacy planner cannot  deal Type with precision like DataTypes.TIMESTAMP(3) in TableSourceUtil
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15200
>                 URL: https://issues.apache.org/jira/browse/FLINK-15200
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Legacy Planner
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement.
>  at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:684)
>  at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>  at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>  at org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:535)
>  at org.apache.flink.table.client.cli.CliClient.lambda$submitUpdate$0(CliClient.java:231)
>  at java.util.Optional.map(Optional.java:215)
>  at org.apache.flink.table.client.cli.CliClient.submitUpdate(CliClient.java:228)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:129)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#88:FlinkLogicalCalc.LOGICAL(input=RelSubset#87,expr#0..2=\{inputs},expr#3=IS NOT NULL($t1),user=$t1,rowtime=$t0,$condition=$t3), Scan(table:[default_catalog, default_database, JsonSourceTable], fields:(rowtime, user, event), source:KafkaTableSource(rowtime, user, event))]
>  at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>  at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:280)
>  at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:199)
>  at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
>  at org.apache.flink.table.planner.StreamPlanner.writeToUpsertSink(StreamPlanner.scala:350)
>  at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$writeToSink(StreamPlanner.scala:278)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:166)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:145)
>  at scala.Option.map(Option.scala:146)
>  at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:145)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
>  at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:661)
>  at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:482)
>  at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:331)
>  at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$14(LocalExecutor.java:676)
>  at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:220)
>  at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:674)
>  ... 9 more
> Caused by: org.apache.flink.table.api.ValidationException: Rowtime field 'rowtime' has invalid type LocalDateTime. Rowtime attributes must be of type Timestamp.
>  at org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:114)
>  at org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:92)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at org.apache.flink.table.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at org.apache.flink.table.sources.TableSourceUtil$.getPhysicalIndexes(TableSourceUtil.scala:307)
>  at org.apache.flink.table.plan.rules.logical.PushProjectIntoTableSourceScanRule.onMatch(PushProjectIntoTableSourceScanRule.scala:46)
>  at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)