You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 22:18:51 UTC

[4/5] flink git commit: [FLINK-6602] [table] Prevent TableSources with empty time attribute names.

[FLINK-6602] [table] Prevent TableSources with empty time attribute names.

This closes #4135.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7f6d024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7f6d024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7f6d024

Branch: refs/heads/release-1.3
Commit: c7f6d0245cd0d47a99d9badcd43f1ebb8758125e
Parents: 8b91df2
Author: zhe li <zhe li>
Authored: Fri Jun 16 23:13:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:43:10 2017 +0200

----------------------------------------------------------------------
 .../plan/schema/StreamTableSourceTable.scala    | 16 +++++++++++--
 .../api/scala/stream/TableSourceTest.scala      | 24 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index fa15288..408381d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -39,7 +39,13 @@ class StreamTableSourceTable[T](
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
+      case nullTimeSource : DefinedRowtimeAttribute
+        if nullTimeSource.getRowtimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedRowtimeAttribute
+        if emptyStringTimeSource.getRowtimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the rowtime attribute must not be empty.")
+      case timeSource: DefinedRowtimeAttribute  =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -47,7 +53,13 @@ class StreamTableSourceTable[T](
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
+      case nullTimeSource : DefinedProctimeAttribute
+        if nullTimeSource.getProctimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedProctimeAttribute
+        if emptyStringTimeSource.getProctimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the proctime attribute must not be empty.")
+      case timeSource: DefinedProctimeAttribute  =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 890ad32..32961a6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
 import org.apache.flink.table.utils.TableTestBase
@@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
       )
     util.verifyTable(t, expected)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+    val t = util.tEnv.scan("rowTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testProctimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+    val t = util.tEnv.scan("procTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
 }
 
 class TestRowtimeSource(timeField: String)