You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (Jira)" <ji...@apache.org> on 2020/04/29 16:00:00 UTC

[jira] [Commented] (FLINK-16108) StreamSQLExample is failed if running in blink planner

    [ https://issues.apache.org/jira/browse/FLINK-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095577#comment-17095577 ] 

Gyula Fora commented on FLINK-16108:
------------------------------------

I think I am running on a branch that already has this fix :/
But I need to verify this

> StreamSQLExample is failed if running in blink planner
> ------------------------------------------------------
>
>                 Key: FLINK-16108
>                 URL: https://issues.apache.org/jira/browse/FLINK-16108
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>            Priority: Critical
>             Fix For: 1.10.1, 1.11.0
>
>
> {{StreamSQLExample}} in flink-example will fail if the specified planner is blink planner. Exception is as following:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
> Query schema: [user: BIGINT, product: STRING, amount: INT]
> Sink schema: [amount: INT, product: STRING, user: BIGINT]
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
> 	at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)
> Process finished with exit code 1
> {code}
> That's because blink planner will also validate the sink schema even if it is come from {{toAppendStream()}}. However, the {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.
> [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237



--
This message was sent by Atlassian Jira
(v8.3.4#803005)