You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 22:18:51 UTC
[4/5] flink git commit: [FLINK-6602] [table] Prevent TableSources
with empty time attribute names.
[FLINK-6602] [table] Prevent TableSources with empty time attribute names.
This closes #4135.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7f6d024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7f6d024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7f6d024
Branch: refs/heads/release-1.3
Commit: c7f6d0245cd0d47a99d9badcd43f1ebb8758125e
Parents: 8b91df2
Author: zhe li <zhe li>
Authored: Fri Jun 16 23:13:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:43:10 2017 +0200
----------------------------------------------------------------------
.../plan/schema/StreamTableSourceTable.scala | 16 +++++++++++--
.../api/scala/stream/TableSourceTest.scala | 24 +++++++++++++++++++-
2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index fa15288..408381d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -39,7 +39,13 @@ class StreamTableSourceTable[T](
val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
+ case nullTimeSource : DefinedRowtimeAttribute
+ if nullTimeSource.getRowtimeAttribute == null =>
+ None
+ case emptyStringTimeSource: DefinedRowtimeAttribute
+ if emptyStringTimeSource.getRowtimeAttribute.trim.equals("") =>
+ throw TableException("The name of the rowtime attribute must not be empty.")
+ case timeSource: DefinedRowtimeAttribute =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
@@ -47,7 +53,13 @@ class StreamTableSourceTable[T](
}
val proctime = tableSource match {
- case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
+ case nullTimeSource : DefinedProctimeAttribute
+ if nullTimeSource.getProctimeAttribute == null =>
+ None
+ case emptyStringTimeSource: DefinedProctimeAttribute
+ if emptyStringTimeSource.getProctimeAttribute.trim.equals("") =>
+ throw TableException("The name of the proctime attribute must not be empty.")
+ case timeSource: DefinedProctimeAttribute =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/c7f6d024/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 890ad32..32961a6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
import org.apache.flink.table.utils.TableTestBase
@@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
)
util.verifyTable(t, expected)
}
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+ val t = util.tEnv.scan("rowTimeT")
+ .select('id)
+
+ util.tEnv.optimize(t.getRelNode, false)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testProctimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+ val t = util.tEnv.scan("procTimeT")
+ .select('id)
+
+ util.tEnv.optimize(t.getRelNode, false)
+ }
}
class TestRowtimeSource(timeField: String)