You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/05 12:22:41 UTC

[2/5] flink git commit: [FLINK-7572] [table] Improve TableSchema and FlinkTable validation exception messages.

[FLINK-7572] [table] Improve TableSchema and FlinkTable validation exception messages.

This closes #4640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0eef8e8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0eef8e8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0eef8e8c

Branch: refs/heads/master
Commit: 0eef8e8c01041bb0c001282a16c43ea54f859cfa
Parents: b7b0d40
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue Sep 5 08:55:03 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/api/TableSchema.scala    | 15 +++++++-
 .../flink/table/plan/schema/FlinkTable.scala    | 15 +++++++-
 .../validation/FlinkTableValidationTest.scala   | 39 ++++++++++++++++++++
 .../validation/TableSchemaValidationTest.scala  | 27 +++++++++++++-
 4 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index a67a07a..6ee65f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -28,13 +28,24 @@ class TableSchema(
 
   if (columnNames.length != columnTypes.length) {
     throw new TableException(
-      "Number of column indexes and column names must be equal.")
+      s"Number of field names and field types must be equal.\n" +
+        s"Number of names is ${columnNames.length}, number of types is ${columnTypes.length}.\n" +
+        s"List of field names: ${columnNames.mkString("[", ", ", "]")}.\n" +
+        s"List of field types: ${columnTypes.mkString("[", ", ", "]")}.")
   }
 
   // check uniqueness of field names
   if (columnNames.toSet.size != columnTypes.length) {
+    val duplicateFields = columnNames
+      // count occurences of field names
+      .groupBy(identity).mapValues(_.length)
+      // filter for occurences > 1 and map to field name
+      .filter(g => g._2 > 1).keys
+
     throw new TableException(
-      "Table column names must be unique.")
+      s"Field names must be unique.\n" +
+        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
+        s"List of all fields: ${columnNames.mkString("[", ", ", "]")}.")
   }
 
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index df56ae6..c76532f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -37,13 +37,24 @@ abstract class FlinkTable[T](
 
   if (fieldIndexes.length != fieldNames.length) {
     throw new TableException(
-      "Number of field indexes and field names must be equal.")
+      s"Number of field names and field indexes must be equal.\n" +
+        s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" +
+        s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" +
+        s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.")
   }
 
   // check uniqueness of field names
   if (fieldNames.length != fieldNames.toSet.size) {
+    val duplicateFields = fieldNames
+      // count occurences of field names
+      .groupBy(identity).mapValues(_.length)
+      // filter for occurences > 1 and map to field name
+      .filter(g => g._2 > 1).keys
+
     throw new TableException(
-      "Table field names must be unique.")
+      s"Field names must be unique.\n" +
+        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
+        s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.")
   }
 
   val fieldTypes: Array[TypeInformation[_]] =

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
new file mode 100644
index 0000000..a845f5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class FlinkTableValidationTest extends TableTestBase {
+
+  @Test
+  def testFieldNamesDuplicate() {
+
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Field names must be unique.\n" +
+      "List of duplicate fields: [a].\n" +
+      "List of all fields: [a, a, b].")
+
+    val util = batchTestUtil()
+    util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
index 1a7815a..c430e59 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
@@ -24,12 +24,35 @@ import org.junit.Test
 
 class TableSchemaValidationTest extends TableTestBase {
 
-  @Test(expected = classOf[TableException])
-  def testInvalidSchema() {
+  @Test
+  def testColumnNameAndColumnTypeNotEqual() {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage(
+      "Number of field names and field types must be equal.\n" +
+        "Number of names is 3, number of types is 2.\n" +
+        "List of field names: [a, b, c].\n" +
+        "List of field types: [Integer, String].")
+
     val fieldNames = Array("a", "b", "c")
     val typeInfos: Array[TypeInformation[_]] = Array(
       BasicTypeInfo.INT_TYPE_INFO,
       BasicTypeInfo.STRING_TYPE_INFO)
     new TableSchema(fieldNames, typeInfos)
   }
+
+  @Test
+  def testColumnNamesDuplicate() {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage(
+      "Field names must be unique.\n" +
+        "List of duplicate fields: [a].\n" +
+        "List of all fields: [a, a, c].")
+
+    val fieldNames = Array("a", "a", "c")
+    val typeInfos: Array[TypeInformation[_]] = Array(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO)
+    new TableSchema(fieldNames, typeInfos)
+  }
 }