You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/05 01:29:25 UTC
[flink] branch release-1.10 updated: [FLINK-15584][table-planner]
Give nested data type of ROWs in ValidationException
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 2949348 [FLINK-15584][table-planner] Give nested data type of ROWs in ValidationException
2949348 is described below
commit 2949348e7be93fb0c60f7d329342155598d42dc5
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Feb 26 19:59:33 2020 +0530
[FLINK-15584][table-planner] Give nested data type of ROWs in ValidationException
---
.../org/apache/flink/table/sinks/TableSinkUtils.scala | 4 ++--
.../sql/validation/InsertIntoValidationTest.scala | 19 +++++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 1764131..cde3111 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -55,10 +55,10 @@ object TableSinkUtils {
// format table and table sink schema strings
val srcSchema = srcFieldNames.zip(srcFieldTypes)
- .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
+ .map { case (n, t) => s"$n: $t" }
.mkString("[", ", ", "]")
val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
- .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
+ .map { case (n, t) => s"$n: $t" }
.mkString("[", ", ", "]")
throw new ValidationException(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index faccc9f..d8915df 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -74,4 +74,23 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because partial insert is not supported yet.
util.tableEnv.sqlUpdate(sql)
}
+
+ @Test
+ def testValidationExceptionMessage(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage("TableSink schema: [a: Integer, b: Row" +
+ "(f0: Integer, f1: Integer, f2: Integer)]")
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+ val fieldNames = Array("a", "b")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.ROW
+ (Types.INT, Types.INT, Types.INT))
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", sink.configure(fieldNames,
+ fieldTypes))
+
+ val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
+
+ util.tableEnv.sqlUpdate(sql)
+ }
}