You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/12/13 04:06:12 UTC

[flink] 04/04: [FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff9b7f1b60a4aeb1d925b236b9818002aad830de
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Wed Dec 12 18:28:49 2018 +0800

    [FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.
    
    This closes 7289.
---
 docs/dev/table/tableApi.md                         |  4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  5 ++-
 .../apache/flink/table/api/TableEnvironment.scala  |  4 +-
 .../flink/table/expressions/ExpressionParser.scala | 20 ++++-----
 .../flink/table/api/TableEnvironmentTest.scala     |  6 +--
 .../StreamTableEnvironmentValidationTest.scala     |  6 +--
 .../stringexpr/AggregateStringExpressionTest.scala | 47 ++++++++++++++++++++++
 .../runtime/stream/TimeAttributesITCase.scala      |  4 +-
 8 files changed, 73 insertions(+), 23 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f308aca..e44df24 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1728,7 +1728,7 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = timeIndicator | overConstant | alias ;
+expression = overConstant | alias ;
 
 alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
 
@@ -1744,7 +1744,7 @@ unary = [ "!" | "-" | "+" ] , composite ;
 
 composite = over | suffixed | nullLiteral | prefixed | atom ;
 
-suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall ;
+suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;
 
 prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8c6a1e0..4fa501c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -669,12 +669,15 @@ abstract class StreamTableEnvironment(
       case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractRowtime(idx, name, None)
 
-      case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, _)), idx) =>
+      case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, _), idx) =>
         extractRowtime(idx, name, Some(origName))
 
       case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractProctime(idx, name)
 
+      case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), idx) =>
+        extractProctime(idx, name)
+
       case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
 
       case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index e28a471..ba78963 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -1089,7 +1089,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             } else {
               referenceByName(origName, t).map((_, name))
             }
-          case (_: TimeAttribute, _) =>
+          case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
@@ -1101,7 +1101,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             referenceByName(name, p).map((_, name))
           case Alias(UnresolvedFieldReference(origName), name: String, _) =>
             referenceByName(origName, p).map((_, name))
-          case _: TimeAttribute =>
+          case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 7fd9309..d5d64b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -355,7 +355,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     // expression with distinct suffix modifier
     suffixDistinct |
     // function call must always be at the end
-    suffixFunctionCall | suffixFunctionCallOneArg
+    suffixFunctionCall | suffixFunctionCallOneArg |
+    // rowtime or proctime
+    timeIndicator
 
   // prefix operators
 
@@ -525,15 +527,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
 
-  lazy val proctime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ PROCTIME ^^ {
-      case f ~ _ ~ _ => ProctimeAttribute(f)
-    }
+  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+    case f ~ _ ~ _ => ProctimeAttribute(f)
+  }
 
-  lazy val rowtime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ ROWTIME ^^ {
-      case f ~ _ ~ _ => RowtimeAttribute(f)
-    }
+  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+    case f ~ _ ~ _ => RowtimeAttribute(f)
+  }
 
   // alias
 
@@ -547,7 +547,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       case e ~ _ ~ name => Alias(e, name.name)
   }
 
-  lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias |
+  lazy val expression: PackratParser[Expression] = overConstant | alias |
     failure("Invalid expression.")
 
   lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1c097d3..9107726 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -531,17 +531,17 @@ class TableEnvironmentTest extends TableTestBase {
 
     // case class
     util.verifySchema(
-      util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+      util.addTable[CClassWithTime]('cf1, 'cf2.rowtime as 'new, 'cf3),
       Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
 
     // row
     util.verifySchema(
-      util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+      util.addTable('rf1, 'rf2.rowtime as 'new, 'rf3)(TEST_ROW_WITH_TIME),
       Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
 
     // tuple
     util.verifySchema(
-      util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2),
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime as 'new, 'f2),
       Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index bfa7bfa..e256ee8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -38,7 +38,7 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
   def testInvalidRowtimeAliasByPosition(): Unit = {
     val util = streamTestUtil()
     // don't allow aliasing by position
-    util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e)
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime as 'b, 'b, 'c, 'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
@@ -178,13 +178,13 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
   def testInvalidAliasWithRowtimeAttribute(): Unit = {
     val util = streamTestUtil()
     // aliased field does not exist
-    util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+    util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
   }
 
   @Test(expected = classOf[TableException])
   def testInvalidAliasWithRowtimeAttribute2(): Unit = {
     val util = streamTestUtil()
     // aliased field has wrong type
-    util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+    util.addTable[(Int, Long, String)]('_1, '_3.rowtime as 'new, '_2)
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
index ec57436..0833c24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.{Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset}
 import org.apache.flink.table.utils.TableTestBase
@@ -128,4 +129,50 @@ class AggregateStringExpressionTest extends TableTestBase {
 
     verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testProctimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'p.proctime as 'proctime)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'proctime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.proctime as 'proctime, 'w1.start as 'start, 'w1.end as 'end, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("proctime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.proctime as proctime, w1.start as start, w1.end as end, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowtimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[TestPojo]('int, 'long.rowtime as 'rowtime, 'string)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'rowtime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.rowtime as 'rowtime, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("rowtime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.rowtime as rowtime, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+}
+
+class TestPojo() {
+  var int: Int = _
+  var long: Long = _
+  var string: String = _
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 1706fc8..21680e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -549,7 +549,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       .fromElements(p1, p2)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo)
     // use aliases, swap all attributes, and skip b2
-    val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a)
+    val table = stream.toTable(tEnv, 'b.rowtime as 'b, 'c as 'c, 'a as 'a)
     // no aliases, no swapping
     val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c)
     // use proctime, no skipping
@@ -560,7 +560,7 @@ class TimeAttributesITCase extends AbstractTestBase {
     // use aliases, swap all attributes, and skip b2
     val table4 = stream.toTable(
       tEnv,
-      ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*)
+      ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): _*)
     // no aliases, no swapping
     val table5 = stream.toTable(
       tEnv,