You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "roland wang (Jira)" <ji...@apache.org> on 2019/12/16 09:59:00 UTC
[jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has
a problem when validating sinks.
roland wang created FLINK-15283:
-----------------------------------
Summary: Scala version of TableSinkUtils has a problem when validating sinks.
Key: FLINK-15283
URL: https://issues.apache.org/jira/browse/FLINK-15283
Project: Flink
Issue Type: Bug
Components: API / Scala
Affects Versions: 1.9.0
Environment: All environments of flink 1.9.0
Reporter: roland wang
*1. Phenomenon*
I created a kafka sink with the schema like :
{code:java}
[BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
When I tried to insert some data into this sink, an error occurs as follows:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble.
*2. Cause*
I checked the code and found this line :
{code:java}
// validate schema of source table and table sink
val srcFieldTypes = query.getTableSchema.getFieldDataTypes
val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
if (srcFieldTypes.length != sinkFieldTypes.length ||
srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
!PlannerTypeUtils.isInteroperable(
fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
}) {
...{code}
I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema.
I trully hope this bug could be fixed soon.
Thanks for all your hard work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)