You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/03 11:18:50 UTC
[flink] branch master updated: [FLINK-18462][table-planner-blink]
Improve the exception message when INSERT INTO mismatch types for empty
char
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1306c [FLINK-18462][table-planner-blink] Improve the exception message when INSERT INTO mismatch types for empty char
7e1306c is described below
commit 7e1306cff40187824e5a4f5f1b6dd1c2b92821c8
Author: Jark Wu <ja...@apache.org>
AuthorDate: Fri Jul 3 19:17:59 2020 +0800
[FLINK-18462][table-planner-blink] Improve the exception message when INSERT INTO mismatch types for empty char
This closes #12806
---
.../java/org/apache/flink/table/api/TableSchema.java | 4 ++--
.../org/apache/flink/table/api/TableSchemaTest.java | 2 +-
.../flink/table/planner/sinks/TableSinkUtils.scala | 4 ++--
.../table/planner/plan/stream/sql/TableScanTest.scala | 18 ++++++++++++++++++
.../table/planner/plan/stream/sql/TableSinkTest.scala | 19 +++++++++++++++++++
.../validation/LegacyTableSinkValidationTest.scala | 4 ++--
6 files changed, 44 insertions(+), 7 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index de67770..f40fe26 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -421,9 +421,9 @@ public class TableSchema {
LogicalType watermarkOutputType = watermark.getWatermarkExprOutputType().getLogicalType();
if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
throw new ValidationException(String.format(
- "Watermark strategy '%s' must be of type TIMESTAMP but is of type '%s'.",
+ "Watermark strategy %s must be of type TIMESTAMP but is of type '%s'.",
watermark.getWatermarkExpr(),
- watermarkOutputType.asSerializableString()));
+ watermarkOutputType.asSummaryString()));
}
}
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
index e23a2de..9ae1695 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
@@ -170,7 +170,7 @@ public class TableSchemaTest {
public void testDifferentWatermarkStrategyOutputTypes() {
List<Tuple2<DataType, String>> testData = new ArrayList<>();
testData.add(Tuple2.of(DataTypes.BIGINT(), "but is of type 'BIGINT'"));
- testData.add(Tuple2.of(DataTypes.STRING(), "but is of type 'VARCHAR(2147483647)'"));
+ testData.add(Tuple2.of(DataTypes.STRING(), "but is of type 'STRING'"));
testData.add(Tuple2.of(DataTypes.INT(), "but is of type 'INT'"));
testData.add(Tuple2.of(DataTypes.TIMESTAMP(), "PASS"));
testData.add(Tuple2.of(DataTypes.TIMESTAMP(0), "PASS"));
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 14c45c8..6ee6a8a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -89,10 +89,10 @@ object TableSinkUtils {
} else {
// format query and sink schema strings
val srcSchema = queryLogicalType.getFields
- .map(f => s"${f.getName}: ${f.getType.asSerializableString()}")
+ .map(f => s"${f.getName}: ${f.getType.asSummaryString()}")
.mkString("[", ", ", "]")
val sinkSchema = sinkLogicalType.getFields
- .map(f => s"${f.getName}: ${f.getType.asSerializableString()}")
+ .map(f => s"${f.getName}: ${f.getType.asSummaryString()}")
.mkString("[", ", ", "]")
val sinkDesc: String = sinkIdentifier.getOrElse("")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 1d3fccc..37c9464 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -339,4 +339,22 @@ class TableScanTest extends TableTestBase {
thrown.expectMessage("DynamicTableSource with SupportsFilterPushDown ability is not supported")
util.verifyPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testInvalidWatermarkOutputType(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage(
+ "Watermark strategy '' must be of type TIMESTAMP but is of type 'CHAR(0) NOT NULL'.")
+ util.addTable(
+ """
+ |CREATE TABLE src (
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE,
+ | WATERMARK FOR `ts` AS ''
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ """.stripMargin)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index b5d3136..9e259aa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -35,6 +35,25 @@ class TableSinkTest extends TableTestBase {
val INT: LogicalType = DataTypes.INT().getLogicalType
@Test
+ def testInsertMismatchTypeForEmptyChar(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE my_sink (
+ | name STRING,
+ | email STRING,
+ | message_offset BIGINT
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage(
+ "Query schema: [a: INT, EXPR$1: CHAR(0) NOT NULL, EXPR$2: CHAR(0) NOT NULL]\n" +
+ "Sink schema: [name: STRING, email: STRING, message_offset: BIGINT]")
+ util.verifyPlanInsert("INSERT INTO my_sink SELECT a, '', '' FROM MyTable")
+ }
+
+ @Test
def testExceptionForAppendSink(): Unit = {
util.addTable(
s"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
index e17bdb5..b6cea56 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
@@ -91,8 +91,8 @@ class LegacyTableSinkValidationTest extends TableTestBase {
expectedException.expectMessage(
"Field types of query result and registered TableSink default_catalog." +
"default_database.testSink do not match.\n" +
- "Query schema: [a: INT, b: BIGINT, c: VARCHAR(2147483647), d: BIGINT]\n" +
- "Sink schema: [a: INT, b: BIGINT, c: VARCHAR(2147483647), d: INT]")
+ "Query schema: [a: INT, b: BIGINT, c: STRING, d: BIGINT]\n" +
+ "Sink schema: [a: INT, b: BIGINT, c: STRING, d: INT]")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)