You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2019/12/16 11:08:00 UTC

[jira] [Commented] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

    [ https://issues.apache.org/jira/browse/FLINK-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16997176#comment-16997176 ] 

Jark Wu commented on FLINK-15283:
---------------------------------

Hi [~rolandWang], actually, this is not a bug of Flink. In SQL world, when inserting a query into a table, the mapping from query fields to sink fields is not by column names, but by the order of fields.  That is because the column names of query may be missing and hard to keep the same with sinks. 

> Scala version of TableSinkUtils has a problem when validating sinks.
> --------------------------------------------------------------------
>
>                 Key: FLINK-15283
>                 URL: https://issues.apache.org/jira/browse/FLINK-15283
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Scala
>    Affects Versions: 1.9.0
>         Environment: All environments of flink 1.9.0
>            Reporter: roland wang
>            Priority: Major
>
> *1. Phenomenon*
> I created a kafka sink with the schema like :
> {code:java}
> [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
> When I tried to insert some data into this sink, an error occurs as follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
>  ** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble.
> *2. Cause*
> I checked the code and found this line :
> {code:java}
> // validate schema of source table and table sink
> val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
> if (srcFieldTypes.length != sinkFieldTypes.length ||
>   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
>     !PlannerTypeUtils.isInteroperable(
>       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
>   }) {
> ...{code}
> I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema.
> I trully hope this bug could be fixed soon.
> Thanks for all your hard work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)