You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/12/13 04:06:12 UTC
[flink] 04/04: [FLINK-11001] [table] Fix alias on window rowtime
attribute in Java Table API.
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff9b7f1b60a4aeb1d925b236b9818002aad830de
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Wed Dec 12 18:28:49 2018 +0800
[FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.
This closes 7289.
---
docs/dev/table/tableApi.md | 4 +-
.../flink/table/api/StreamTableEnvironment.scala | 5 ++-
.../apache/flink/table/api/TableEnvironment.scala | 4 +-
.../flink/table/expressions/ExpressionParser.scala | 20 ++++-----
.../flink/table/api/TableEnvironmentTest.scala | 6 +--
.../StreamTableEnvironmentValidationTest.scala | 6 +--
.../stringexpr/AggregateStringExpressionTest.scala | 47 ++++++++++++++++++++++
.../runtime/stream/TimeAttributesITCase.scala | 4 +-
8 files changed, 73 insertions(+), 23 deletions(-)
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f308aca..e44df24 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1728,7 +1728,7 @@ This is the EBNF grammar for expressions:
expressionList = expression , { "," , expression } ;
-expression = timeIndicator | overConstant | alias ;
+expression = overConstant | alias ;
alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
@@ -1744,7 +1744,7 @@ unary = [ "!" | "-" | "+" ] , composite ;
composite = over | suffixed | nullLiteral | prefixed | atom ;
-suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall ;
+suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;
prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8c6a1e0..4fa501c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -669,12 +669,15 @@ abstract class StreamTableEnvironment(
case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractRowtime(idx, name, None)
- case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, _)), idx) =>
+ case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, _), idx) =>
extractRowtime(idx, name, Some(origName))
case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractProctime(idx, name)
+ case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), idx) =>
+ extractProctime(idx, name)
+
case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index e28a471..ba78963 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -1089,7 +1089,7 @@ abstract class TableEnvironment(val config: TableConfig) {
} else {
referenceByName(origName, t).map((_, name))
}
- case (_: TimeAttribute, _) =>
+ case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
@@ -1101,7 +1101,7 @@ abstract class TableEnvironment(val config: TableConfig) {
referenceByName(name, p).map((_, name))
case Alias(UnresolvedFieldReference(origName), name: String, _) =>
referenceByName(origName, p).map((_, name))
- case _: TimeAttribute =>
+ case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 7fd9309..d5d64b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -355,7 +355,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// expression with distinct suffix modifier
suffixDistinct |
// function call must always be at the end
- suffixFunctionCall | suffixFunctionCallOneArg
+ suffixFunctionCall | suffixFunctionCallOneArg |
+ // rowtime or proctime
+ timeIndicator
// prefix operators
@@ -525,15 +527,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
- lazy val proctime: PackratParser[Expression] =
- (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ PROCTIME ^^ {
- case f ~ _ ~ _ => ProctimeAttribute(f)
- }
+ lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+ case f ~ _ ~ _ => ProctimeAttribute(f)
+ }
- lazy val rowtime: PackratParser[Expression] =
- (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ ROWTIME ^^ {
- case f ~ _ ~ _ => RowtimeAttribute(f)
- }
+ lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+ case f ~ _ ~ _ => RowtimeAttribute(f)
+ }
// alias
@@ -547,7 +547,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case e ~ _ ~ name => Alias(e, name.name)
}
- lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias |
+ lazy val expression: PackratParser[Expression] = overConstant | alias |
failure("Invalid expression.")
lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1c097d3..9107726 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -531,17 +531,17 @@ class TableEnvironmentTest extends TableTestBase {
// case class
util.verifySchema(
- util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+ util.addTable[CClassWithTime]('cf1, 'cf2.rowtime as 'new, 'cf3),
Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
// row
util.verifySchema(
- util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+ util.addTable('rf1, 'rf2.rowtime as 'new, 'rf3)(TEST_ROW_WITH_TIME),
Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
// tuple
util.verifySchema(
- util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2),
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime as 'new, 'f2),
Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
}
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index bfa7bfa..e256ee8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -38,7 +38,7 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
def testInvalidRowtimeAliasByPosition(): Unit = {
val util = streamTestUtil()
// don't allow aliasing by position
- util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e)
+ util.addTable[(Long, Int, String, Int, Long)]('a.rowtime as 'b, 'b, 'c, 'd, 'e)
}
@Test(expected = classOf[TableException])
@@ -178,13 +178,13 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
def testInvalidAliasWithRowtimeAttribute(): Unit = {
val util = streamTestUtil()
// aliased field does not exist
- util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+ util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
}
@Test(expected = classOf[TableException])
def testInvalidAliasWithRowtimeAttribute2(): Unit = {
val util = streamTestUtil()
// aliased field has wrong type
- util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+ util.addTable[(Int, Long, String)]('_1, '_3.rowtime as 'new, '_2)
}
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
index ec57436..0833c24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table.stringexpr
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.{Tumble => JTumble}
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset}
import org.apache.flink.table.utils.TableTestBase
@@ -128,4 +129,50 @@ class AggregateStringExpressionTest extends TableTestBase {
verifyTableEquals(resJava, resScala)
}
+
+ @Test
+ def testProctimeRename(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'p.proctime as 'proctime)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Tumble over 50.milli on 'proctime as 'w1)
+ .groupBy('w1, 'string)
+ .select('w1.proctime as 'proctime, 'w1.start as 'start, 'w1.end as 'end, 'string, 'int.count)
+
+ // String / Java API
+ val resJava = t
+ .window(JTumble.over("50.milli").on("proctime").as("w1"))
+ .groupBy("w1, string")
+ .select("w1.proctime as proctime, w1.start as start, w1.end as end, string, int.count")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testRowtimeRename(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[TestPojo]('int, 'long.rowtime as 'rowtime, 'string)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Tumble over 50.milli on 'rowtime as 'w1)
+ .groupBy('w1, 'string)
+ .select('w1.rowtime as 'rowtime, 'string, 'int.count)
+
+ // String / Java API
+ val resJava = t
+ .window(JTumble.over("50.milli").on("rowtime").as("w1"))
+ .groupBy("w1, string")
+ .select("w1.rowtime as rowtime, string, int.count")
+
+ verifyTableEquals(resJava, resScala)
+ }
+}
+
+class TestPojo() {
+ var int: Int = _
+ var long: Long = _
+ var string: String = _
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 1706fc8..21680e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -549,7 +549,7 @@ class TimeAttributesITCase extends AbstractTestBase {
.fromElements(p1, p2)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo)
// use aliases, swap all attributes, and skip b2
- val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a)
+ val table = stream.toTable(tEnv, 'b.rowtime as 'b, 'c as 'c, 'a as 'a)
// no aliases, no swapping
val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c)
// use proctime, no skipping
@@ -560,7 +560,7 @@ class TimeAttributesITCase extends AbstractTestBase {
// use aliases, swap all attributes, and skip b2
val table4 = stream.toTable(
tEnv,
- ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*)
+ ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): _*)
// no aliases, no swapping
val table5 = stream.toTable(
tEnv,