You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/05 01:29:25 UTC

[flink] branch release-1.10 updated: [FLINK-15584][table-planner] Give nested data type of ROWs in ValidationException

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 2949348  [FLINK-15584][table-planner] Give nested data type of ROWs in ValidationException
2949348 is described below

commit 2949348e7be93fb0c60f7d329342155598d42dc5
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Feb 26 19:59:33 2020 +0530

    [FLINK-15584][table-planner] Give nested data type of ROWs in ValidationException
---
 .../org/apache/flink/table/sinks/TableSinkUtils.scala |  4 ++--
 .../sql/validation/InsertIntoValidationTest.scala     | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 1764131..cde3111 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -55,10 +55,10 @@ object TableSinkUtils {
 
       // format table and table sink schema strings
       val srcSchema = srcFieldNames.zip(srcFieldTypes)
-        .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
+        .map { case (n, t) => s"$n: $t" }
         .mkString("[", ", ", "]")
       val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
-        .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
+        .map { case (n, t) => s"$n: $t" }
         .mkString("[", ", ", "]")
 
       throw new ValidationException(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index faccc9f..d8915df 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -74,4 +74,23 @@ class InsertIntoValidationTest extends TableTestBase {
     // must fail because partial insert is not supported yet.
     util.tableEnv.sqlUpdate(sql)
   }
+
+  @Test
+  def testValidationExceptionMessage(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("TableSink schema:    [a: Integer, b: Row" +
+      "(f0: Integer, f1: Integer, f2: Integer)]")
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+    val fieldNames = Array("a", "b")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.ROW
+    (Types.INT, Types.INT, Types.INT))
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", sink.configure(fieldNames,
+      fieldTypes))
+
+    val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
+
+    util.tableEnv.sqlUpdate(sql)
+  }
 }