You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/03 11:18:50 UTC

[flink] branch master updated: [FLINK-18462][table-planner-blink] Improve the exception message when INSERT INTO mismatch types for empty char

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1306c  [FLINK-18462][table-planner-blink] Improve the exception message when INSERT INTO mismatch types for empty char
7e1306c is described below

commit 7e1306cff40187824e5a4f5f1b6dd1c2b92821c8
Author: Jark Wu <ja...@apache.org>
AuthorDate: Fri Jul 3 19:17:59 2020 +0800

    [FLINK-18462][table-planner-blink] Improve the exception message when INSERT INTO mismatch types for empty char
    
    This closes #12806
---
 .../java/org/apache/flink/table/api/TableSchema.java  |  4 ++--
 .../org/apache/flink/table/api/TableSchemaTest.java   |  2 +-
 .../flink/table/planner/sinks/TableSinkUtils.scala    |  4 ++--
 .../table/planner/plan/stream/sql/TableScanTest.scala | 18 ++++++++++++++++++
 .../table/planner/plan/stream/sql/TableSinkTest.scala | 19 +++++++++++++++++++
 .../validation/LegacyTableSinkValidationTest.scala    |  4 ++--
 6 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index de67770..f40fe26 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -421,9 +421,9 @@ public class TableSchema {
 			LogicalType watermarkOutputType = watermark.getWatermarkExprOutputType().getLogicalType();
 			if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
 				throw new ValidationException(String.format(
-					"Watermark strategy '%s' must be of type TIMESTAMP but is of type '%s'.",
+					"Watermark strategy %s must be of type TIMESTAMP but is of type '%s'.",
 					watermark.getWatermarkExpr(),
-					watermarkOutputType.asSerializableString()));
+					watermarkOutputType.asSummaryString()));
 			}
 		}
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
index e23a2de..9ae1695 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
@@ -170,7 +170,7 @@ public class TableSchemaTest {
 	public void testDifferentWatermarkStrategyOutputTypes() {
 		List<Tuple2<DataType, String>> testData = new ArrayList<>();
 		testData.add(Tuple2.of(DataTypes.BIGINT(), "but is of type 'BIGINT'"));
-		testData.add(Tuple2.of(DataTypes.STRING(), "but is of type 'VARCHAR(2147483647)'"));
+		testData.add(Tuple2.of(DataTypes.STRING(), "but is of type 'STRING'"));
 		testData.add(Tuple2.of(DataTypes.INT(), "but is of type 'INT'"));
 		testData.add(Tuple2.of(DataTypes.TIMESTAMP(), "PASS"));
 		testData.add(Tuple2.of(DataTypes.TIMESTAMP(0), "PASS"));
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 14c45c8..6ee6a8a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -89,10 +89,10 @@ object TableSinkUtils {
     } else {
       // format query and sink schema strings
       val srcSchema = queryLogicalType.getFields
-        .map(f => s"${f.getName}: ${f.getType.asSerializableString()}")
+        .map(f => s"${f.getName}: ${f.getType.asSummaryString()}")
         .mkString("[", ", ", "]")
       val sinkSchema = sinkLogicalType.getFields
-        .map(f => s"${f.getName}: ${f.getType.asSerializableString()}")
+        .map(f => s"${f.getName}: ${f.getType.asSummaryString()}")
         .mkString("[", ", ", "]")
 
       val sinkDesc: String = sinkIdentifier.getOrElse("")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 1d3fccc..37c9464 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -339,4 +339,22 @@ class TableScanTest extends TableTestBase {
     thrown.expectMessage("DynamicTableSource with SupportsFilterPushDown ability is not supported")
     util.verifyPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testInvalidWatermarkOutputType(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Watermark strategy '' must be of type TIMESTAMP but is of type 'CHAR(0) NOT NULL'.")
+    util.addTable(
+      """
+        |CREATE TABLE src (
+        |  ts TIMESTAMP(3),
+        |  a INT,
+        |  b DOUBLE,
+        |  WATERMARK FOR `ts` AS ''
+        |) WITH (
+        |  'connector' = 'values'
+        |)
+      """.stripMargin)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index b5d3136..9e259aa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -35,6 +35,25 @@ class TableSinkTest extends TableTestBase {
   val INT: LogicalType = DataTypes.INT().getLogicalType
 
   @Test
+  def testInsertMismatchTypeForEmptyChar(): Unit = {
+    util.addTable(
+      s"""
+         |CREATE TABLE my_sink (
+         |  name STRING,
+         |  email STRING,
+         |  message_offset BIGINT
+         |) WITH (
+         |  'connector' = 'values'
+         |)
+         |""".stripMargin)
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Query schema: [a: INT, EXPR$1: CHAR(0) NOT NULL, EXPR$2: CHAR(0) NOT NULL]\n" +
+        "Sink schema: [name: STRING, email: STRING, message_offset: BIGINT]")
+    util.verifyPlanInsert("INSERT INTO my_sink SELECT a, '', '' FROM MyTable")
+  }
+
+  @Test
   def testExceptionForAppendSink(): Unit = {
     util.addTable(
       s"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
index e17bdb5..b6cea56 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
@@ -91,8 +91,8 @@ class LegacyTableSinkValidationTest extends TableTestBase {
     expectedException.expectMessage(
       "Field types of query result and registered TableSink default_catalog." +
       "default_database.testSink do not match.\n" +
-      "Query schema: [a: INT, b: BIGINT, c: VARCHAR(2147483647), d: BIGINT]\n" +
-      "Sink schema: [a: INT, b: BIGINT, c: VARCHAR(2147483647), d: INT]")
+      "Query schema: [a: INT, b: BIGINT, c: STRING, d: BIGINT]\n" +
+      "Sink schema: [a: INT, b: BIGINT, c: STRING, d: INT]")
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)