You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "wangtong (Jira)" <ji...@apache.org> on 2020/12/24 15:36:00 UTC

[jira] [Commented] (FLINK-16108) StreamSQLExample is failed if running in blink planner

    [ https://issues.apache.org/jira/browse/FLINK-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254574#comment-17254574 ] 

wangtong commented on FLINK-16108:
----------------------------------

hi [~jark] 

[DynamicSinkUtils.validateSchemaAndApplyImplicitCast|[https://github.com/apache/flink/blob/e710bec72669b5786671b25501d2350fcc997364/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/DynamicSinkUtils.java]] throws createSchemaMismatchException() by different fields order of query schema from sink schema 

 

should the method validateSchemaAndApplyImplicitCast be rewrite to validate the datatype which the sink schema field and query schema field have the same field name
{code:java}
// code placeholder
sinkFields.forEach(sinkField -> {
   RowField queryField = queryFields.stream()
         .filter(q -> q.getName().equals(sinkField.getName()))
         .findFirst()
         .orElseThrow(() -> createSchemaMismatchException(
         String.format(
               "Not found query column for sink column '%s' at position",
               sinkField.getName()
         ),
         sinkIdentifier,
         queryFields,
         sinkFields));

   final LogicalType queryColumnType = queryField.getType();
   final LogicalType sinkColumnType = sinkField.getType();
   if (!supportsImplicitCast(queryColumnType, sinkColumnType)) {
      throw createSchemaMismatchException(
            String.format(
                  "Incompatible types for sink column '%s' at position",
                  sinkField.getName()
            ),
            sinkIdentifier,
            queryFields,
            sinkFields);
   }
   if (!supportsAvoidingCast(queryColumnType, sinkColumnType)) {
      requiresCasting.set(true);
   }
});
{code}
 

 

> StreamSQLExample is failed if running in blink planner
> ------------------------------------------------------
>
>                 Key: FLINK-16108
>                 URL: https://issues.apache.org/jira/browse/FLINK-16108
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>            Priority: Critical
>             Fix For: 1.10.1, 1.11.0
>
>
> {{StreamSQLExample}} in flink-example will fail if the specified planner is blink planner. Exception is as following:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
> Query schema: [user: BIGINT, product: STRING, amount: INT]
> Sink schema: [amount: INT, product: STRING, user: BIGINT]
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
> 	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
> 	at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)
> Process finished with exit code 1
> {code}
> That's because blink planner will also validate the sink schema even if it is come from {{toAppendStream()}}. However, the {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.
> [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237



--
This message was sent by Atlassian Jira
(v8.3.4#803005)