You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Terry Wang (Jira)" <ji...@apache.org> on 2020/04/22 03:14:00 UTC

[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

     [ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Terry Wang updated FLINK-17313:
-------------------------------
    Description: 
Test code like follwing(in blink planner):
{code:java}
		tEnv.sqlUpdate("create table randomSource (" +
						"		a varchar(10)," +
						"		b decimal(20,2)" +
						"	) with (" +
						"		'type' = 'random'," +
						"		'count' = '10'" +
						"	)");
		tEnv.sqlUpdate("create table printSink (" +
						"		a varchar(10)," +
						"		b decimal(22,2)," +
						"		c timestamp(3)," +
						"	) with (" +
						"	'type' = 'print'" +
						"	)");
		tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource");
		tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following:


{code:java}
public TypeInformation<Row> getRecordType() {
		return getTableSchema().toRowType();
	}
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type.

	at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
	at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
	at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
	at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
	at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
	at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem.






  was:
Test code like follwing(in blink planner):
{code:java}
		tEnv.sqlUpdate("create table randomSource (" +
						"		a varchar(10)," +
						"		b decimal(20,2)" +
						"	) with (" +
						"		'type' = 'random'," +
						"		'count' = '10'" +
						"	)");
		tEnv.sqlUpdate("create table printSink (" +
						"		a varchar(10)," +
						"		b decimal(22,2)," +
						"		c timestamp(3)," +
						"	) with (" +
						"	'type' = 'print'" +
						"	)");
		tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource");
		tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following:


{code:java}
public TypeInformation<Row> getRecordType() {
		return getTableSchema().toRowType();
	}
{code}


varchar type exception is:


||Heading 1||
|org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type.

	at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
	at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
	at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
	at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
	at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
	at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
|
other type validation is similar, I dig it and found it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem.







> Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17313
>                 URL: https://issues.apache.org/jira/browse/FLINK-17313
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Terry Wang
>            Priority: Major
>
> Test code like follwing(in blink planner):
> {code:java}
> 		tEnv.sqlUpdate("create table randomSource (" +
> 						"		a varchar(10)," +
> 						"		b decimal(20,2)" +
> 						"	) with (" +
> 						"		'type' = 'random'," +
> 						"		'count' = '10'" +
> 						"	)");
> 		tEnv.sqlUpdate("create table printSink (" +
> 						"		a varchar(10)," +
> 						"		b decimal(22,2)," +
> 						"		c timestamp(3)," +
> 						"	) with (" +
> 						"	'type' = 'print'" +
> 						"	)");
> 		tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource");
> 		tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
> 		return getTableSchema().toRowType();
> 	}
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type.
> 	at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
> 	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
> 	at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
> 	at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
> 	at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
> 	at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
> 	at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
> 	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> 	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)