You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "roland wang (Jira)" <ji...@apache.org> on 2019/12/16 09:59:00 UTC

[jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

roland wang created FLINK-15283:
-----------------------------------

             Summary: Scala version of TableSinkUtils has a problem when validating sinks.
                 Key: FLINK-15283
                 URL: https://issues.apache.org/jira/browse/FLINK-15283
             Project: Flink
          Issue Type: Bug
          Components: API / Scala
    Affects Versions: 1.9.0
         Environment: All environments of flink 1.9.0
            Reporter: roland wang


*1. Phenomenon*

I created a kafka sink with the schema like :
{code:java}
[BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
When I tried to insert some data into this sink, an error occurs as follows:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
 ** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble.

*2. Cause*

I checked the code and found this line :
{code:java}
// validate schema of source table and table sink
val srcFieldTypes = query.getTableSchema.getFieldDataTypes
val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes

if (srcFieldTypes.length != sinkFieldTypes.length ||
  srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
    !PlannerTypeUtils.isInteroperable(
      fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
  }) {
...{code}
I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema.

I trully hope this bug could be fixed soon.

Thanks for all your hard work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)