You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Terry Wang (Jira)" <ji...@apache.org> on 2020/04/22 03:18:00 UTC

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

    [ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089251#comment-17089251 ] 

Terry Wang commented on FLINK-17313:
------------------------------------

cc [~ykt836][~jark][~dwysakowicz]Please have a look on this issue.

> Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17313
>                 URL: https://issues.apache.org/jira/browse/FLINK-17313
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Terry Wang
>            Priority: Major
>
> Test code like follwing(in blink planner):
> {code:java}
> 		tEnv.sqlUpdate("create table randomSource (" +
> 						"		a varchar(10)," +
> 						"		b decimal(20,2)" +
> 						"	) with (" +
> 						"		'type' = 'random'," +
> 						"		'count' = '10'" +
> 						"	)");
> 		tEnv.sqlUpdate("create table printSink (" +
> 						"		a varchar(10)," +
> 						"		b decimal(22,2)," +
> 						"		c timestamp(3)," +
> 						"	) with (" +
> 						"	'type' = 'print'" +
> 						"	)");
> 		tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource");
> 		tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
> 		return getTableSchema().toRowType();
> 	}
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type.
> 	at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
> 	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
> 	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
> 	at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
> 	at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
> 	at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
> 	at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
> 	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)