You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/12/13 04:06:08 UTC

[flink] branch master updated (ea8373e -> ff9b7f1)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ea8373e  [FLINK-11085][build] Fix inheritance of shading filters
     new f670cac  [hotfix] [docs] Fix typos in Table and SQL docs
     new c8dc83c  [hotfix] [docs] Improve DataSet.partitionCustom() documentation.
     new ed0aefa  [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates.
     new ff9b7f1  [FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/batch/index.md                            | 15 ++++---
 docs/dev/table/sql.md                              |  2 +-
 docs/dev/table/streaming/dynamic_tables.md         |  6 +--
 docs/dev/table/streaming/joins.md                  |  4 +-
 docs/dev/table/streaming/temporal_tables.md        |  3 +-
 docs/dev/table/streaming/time_attributes.md        |  2 +-
 docs/dev/table/tableApi.md                         |  4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  5 ++-
 .../apache/flink/table/api/TableEnvironment.scala  |  4 +-
 .../table/codegen/AggregationCodeGenerator.scala   | 17 +++++++-
 .../flink/table/expressions/ExpressionParser.scala | 20 ++++-----
 .../flink/table/api/TableEnvironmentTest.scala     |  6 +--
 .../StreamTableEnvironmentValidationTest.scala     |  6 +--
 .../stringexpr/AggregateStringExpressionTest.scala | 47 ++++++++++++++++++++++
 .../runtime/stream/TimeAttributesITCase.scala      |  4 +-
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  7 ++--
 16 files changed, 110 insertions(+), 42 deletions(-)


[flink] 01/04: [hotfix] [docs] Fix typos in Table and SQL docs

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f670cac23a3dc3081763fa8dfc71fda080eb47c0
Author: Alexander Fedulov <al...@data-artisans.com>
AuthorDate: Wed Dec 12 19:58:05 2018 +0100

    [hotfix] [docs] Fix typos in Table and SQL docs
    
    This closes #7297.
---
 docs/dev/table/sql.md                       | 2 +-
 docs/dev/table/streaming/dynamic_tables.md  | 6 +++---
 docs/dev/table/streaming/joins.md           | 4 ++--
 docs/dev/table/streaming/temporal_tables.md | 3 +--
 docs/dev/table/streaming/time_attributes.md | 2 +-
 5 files changed, 8 insertions(+), 9 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 55c144a..1e7ef83 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -22,7 +22,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can seamlessly mixed and are holistically optimized and transl [...]
+SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...]
 
 In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-external-catalog) to specify the location of th [...]
 
diff --git a/docs/dev/table/streaming/dynamic_tables.md b/docs/dev/table/streaming/dynamic_tables.md
index ada75a3..f8bcb94 100644
--- a/docs/dev/table/streaming/dynamic_tables.md
+++ b/docs/dev/table/streaming/dynamic_tables.md
@@ -53,12 +53,12 @@ The following table compares traditional relational algebra and stream processin
 	</tr>
 </table>
 
-Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdate [...]
+Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdate [...]
 
 The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following:
 
 - A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*.
-- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view's base relations.
+- A materialized view is defined as a SQL query. In order to update the view, the query continuously processes the changelog streams of the view's base relations.
 - The materialized view is the result of the streaming SQL query.
 
 With these points in mind, we introduce following concept of *Dynamic tables* in the next section.
@@ -177,7 +177,7 @@ When converting a dynamic table into a stream or writing it to an external syste
 </center>
 <br><br>
 
-* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages* and *delete messages*. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a dynamic table by encoding `INSERT` and `UPDATE` changes as upsert messages and `DELETE` changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages co [...]
+* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages* and *delete messages*. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a stream by encoding `INSERT` and `UPDATE` changes as upsert messages and `DELETE` changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly [...]
 
 <center>
 <img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/redo-mode.png" width="85%">
diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md
index f293406..508e8c7 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -143,7 +143,7 @@ WHERE r.currency = o.currency
 Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record.
 In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key.
 
-In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. 
+In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation.
 
 In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join.
 This again allows Flink to limit the number of elements that must be kept in the state.
@@ -199,7 +199,7 @@ By definition, it is always the current timestamp. Thus, invocations of a proces
 and any updates in the underlying history table will also immediately overwrite the current values.
 
 Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state.
-New updates will have no effect on the previously results emitted/processed records from the probe side.
+Updates of the build side will have no effect on previously emitted join results.
 
 One can think about a processing-time temporal join as a simple `HashMap<K, V>` that stores all of the records from the build side.
 When a new record from the build side has the same key as some previous record, the old value is just simply overwritten.
diff --git a/docs/dev/table/streaming/temporal_tables.md b/docs/dev/table/streaming/temporal_tables.md
index b450527..4ebb4a6 100644
--- a/docs/dev/table/streaming/temporal_tables.md
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -114,7 +114,7 @@ Each query to `Rates(timeAttribute)` would return the state of the `Rates` for t
 **Note**: Currently, Flink doesn't support directly querying the temporal table functions with a constant time attribute parameter. At the moment, temporal table functions can only be used in joins.
 The example above was used to provide an intuition about what the function `Rates(timeAttribute)` returns.
 
-See also the [joining page for continuous queries](joins.html) for more information about how to join with a temporal table.
+See also the page about [joins for continuous queries](joins.html) for more information about how to join with a temporal table.
 
 ### Defining Temporal Table Function
 
@@ -171,7 +171,6 @@ val ratesHistory = env
   .fromCollection(ratesHistoryData)
   .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
 
-tEnv.registerTable("Orders", orders)
 tEnv.registerTable("RatesHistory", ratesHistory)
 
 // Create and register TemporalTableFunction.
diff --git a/docs/dev/table/streaming/time_attributes.md b/docs/dev/table/streaming/time_attributes.md
index 0165813..27208fb 100644
--- a/docs/dev/table/streaming/time_attributes.md
+++ b/docs/dev/table/streaming/time_attributes.md
@@ -40,7 +40,7 @@ Introduction to Time Attributes
 
 Time-based operations such as windows in both the [Table API]({{ site.baseurl }}/dev/table/tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
 
-Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can used in time-based operations.
+Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can be used in time-based operations.
 
 As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink's time and watermarking system and thus can not be used for time-based operations anymore.
 


[flink] 04/04: [FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff9b7f1b60a4aeb1d925b236b9818002aad830de
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Wed Dec 12 18:28:49 2018 +0800

    [FLINK-11001] [table] Fix alias on window rowtime attribute in Java Table API.
    
    This closes 7289.
---
 docs/dev/table/tableApi.md                         |  4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  5 ++-
 .../apache/flink/table/api/TableEnvironment.scala  |  4 +-
 .../flink/table/expressions/ExpressionParser.scala | 20 ++++-----
 .../flink/table/api/TableEnvironmentTest.scala     |  6 +--
 .../StreamTableEnvironmentValidationTest.scala     |  6 +--
 .../stringexpr/AggregateStringExpressionTest.scala | 47 ++++++++++++++++++++++
 .../runtime/stream/TimeAttributesITCase.scala      |  4 +-
 8 files changed, 73 insertions(+), 23 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f308aca..e44df24 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1728,7 +1728,7 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = timeIndicator | overConstant | alias ;
+expression = overConstant | alias ;
 
 alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
 
@@ -1744,7 +1744,7 @@ unary = [ "!" | "-" | "+" ] , composite ;
 
 composite = over | suffixed | nullLiteral | prefixed | atom ;
 
-suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall ;
+suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;
 
 prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8c6a1e0..4fa501c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -669,12 +669,15 @@ abstract class StreamTableEnvironment(
       case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractRowtime(idx, name, None)
 
-      case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, _)), idx) =>
+      case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, _), idx) =>
         extractRowtime(idx, name, Some(origName))
 
       case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractProctime(idx, name)
 
+      case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), idx) =>
+        extractProctime(idx, name)
+
       case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
 
       case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index e28a471..ba78963 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -1089,7 +1089,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             } else {
               referenceByName(origName, t).map((_, name))
             }
-          case (_: TimeAttribute, _) =>
+          case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
@@ -1101,7 +1101,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             referenceByName(name, p).map((_, name))
           case Alias(UnresolvedFieldReference(origName), name: String, _) =>
             referenceByName(origName, p).map((_, name))
-          case _: TimeAttribute =>
+          case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 7fd9309..d5d64b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -355,7 +355,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     // expression with distinct suffix modifier
     suffixDistinct |
     // function call must always be at the end
-    suffixFunctionCall | suffixFunctionCallOneArg
+    suffixFunctionCall | suffixFunctionCallOneArg |
+    // rowtime or proctime
+    timeIndicator
 
   // prefix operators
 
@@ -525,15 +527,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
 
-  lazy val proctime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ PROCTIME ^^ {
-      case f ~ _ ~ _ => ProctimeAttribute(f)
-    }
+  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+    case f ~ _ ~ _ => ProctimeAttribute(f)
+  }
 
-  lazy val rowtime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ ROWTIME ^^ {
-      case f ~ _ ~ _ => RowtimeAttribute(f)
-    }
+  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+    case f ~ _ ~ _ => RowtimeAttribute(f)
+  }
 
   // alias
 
@@ -547,7 +547,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       case e ~ _ ~ name => Alias(e, name.name)
   }
 
-  lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias |
+  lazy val expression: PackratParser[Expression] = overConstant | alias |
     failure("Invalid expression.")
 
   lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1c097d3..9107726 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -531,17 +531,17 @@ class TableEnvironmentTest extends TableTestBase {
 
     // case class
     util.verifySchema(
-      util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+      util.addTable[CClassWithTime]('cf1, 'cf2.rowtime as 'new, 'cf3),
       Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
 
     // row
     util.verifySchema(
-      util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+      util.addTable('rf1, 'rf2.rowtime as 'new, 'rf3)(TEST_ROW_WITH_TIME),
       Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
 
     // tuple
     util.verifySchema(
-      util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2),
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime as 'new, 'f2),
       Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index bfa7bfa..e256ee8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -38,7 +38,7 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
   def testInvalidRowtimeAliasByPosition(): Unit = {
     val util = streamTestUtil()
     // don't allow aliasing by position
-    util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e)
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime as 'b, 'b, 'c, 'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
@@ -178,13 +178,13 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
   def testInvalidAliasWithRowtimeAttribute(): Unit = {
     val util = streamTestUtil()
     // aliased field does not exist
-    util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+    util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
   }
 
   @Test(expected = classOf[TableException])
   def testInvalidAliasWithRowtimeAttribute2(): Unit = {
     val util = streamTestUtil()
     // aliased field has wrong type
-    util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+    util.addTable[(Int, Long, String)]('_1, '_3.rowtime as 'new, '_2)
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
index ec57436..0833c24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.{Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset}
 import org.apache.flink.table.utils.TableTestBase
@@ -128,4 +129,50 @@ class AggregateStringExpressionTest extends TableTestBase {
 
     verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testProctimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'p.proctime as 'proctime)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'proctime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.proctime as 'proctime, 'w1.start as 'start, 'w1.end as 'end, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("proctime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.proctime as proctime, w1.start as start, w1.end as end, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowtimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[TestPojo]('int, 'long.rowtime as 'rowtime, 'string)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'rowtime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.rowtime as 'rowtime, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("rowtime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.rowtime as rowtime, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+}
+
+class TestPojo() {
+  var int: Int = _
+  var long: Long = _
+  var string: String = _
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 1706fc8..21680e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -549,7 +549,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       .fromElements(p1, p2)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo)
     // use aliases, swap all attributes, and skip b2
-    val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a)
+    val table = stream.toTable(tEnv, 'b.rowtime as 'b, 'c as 'c, 'a as 'a)
     // no aliases, no swapping
     val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c)
     // use proctime, no skipping
@@ -560,7 +560,7 @@ class TimeAttributesITCase extends AbstractTestBase {
     // use aliases, swap all attributes, and skip b2
     val table4 = stream.toTable(
       tEnv,
-      ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*)
+      ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): _*)
     // no aliases, no swapping
     val table5 = stream.toTable(
       tEnv,


[flink] 02/04: [hotfix] [docs] Improve DataSet.partitionCustom() documentation.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8dc83c3a6b62d95dbd1b1197a2c874208b1e0b6
Author: KarmaGYZ <ka...@gmail.com>
AuthorDate: Mon Dec 10 18:26:21 2018 +0800

    [hotfix] [docs] Improve DataSet.partitionCustom() documentation.
    
    This closes #7282.
---
 docs/dev/batch/index.md | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index d004364..0a498df 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -401,12 +401,14 @@ DataSet<Integer> result = in.partitionByRange(0)
     <tr>
       <td><strong>Custom Partitioning</strong></td>
       <td>
-        <p>Manually specify a partitioning over the data.
+        <p>Assigns records based on a key to a specific partition using a custom Partitioner function. 
+          The key can be specified as position key, expression key, and key selector function.
           <br/>
-          <i>Note</i>: This method works only on single field keys.</p>
+          <i>Note</i>: This method only works with a single field key.</p>
 {% highlight java %}
 DataSet<Tuple2<String,Integer>> in = // [...]
-DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
+DataSet<Integer> result = in.partitionCustom(partitioner, key)
+                            .mapPartition(new PartitionMapper());
 {% endhighlight %}
       </td>
     </tr>
@@ -703,13 +705,14 @@ val result = in.partitionByRange(0).mapPartition { ... }
     <tr>
       <td><strong>Custom Partitioning</strong></td>
       <td>
-        <p>Manually specify a partitioning over the data.
+        <p>Assigns records based on a key to a specific partition using a custom Partitioner function. 
+          The key can be specified as position key, expression key, and key selector function.
           <br/>
-          <i>Note</i>: This method works only on single field keys.</p>
+          <i>Note</i>: This method only works with a single field key.</p>
 {% highlight scala %}
 val in: DataSet[(Int, String)] = // [...]
 val result = in
-  .partitionCustom(partitioner: Partitioner[K], key)
+  .partitionCustom(partitioner, key).mapPartition { ... }
 {% endhighlight %}
       </td>
     </tr>


[flink] 03/04: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed0aefa6775f655591b4c0fe46382446921b7155
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Mon Dec 10 21:33:02 2018 +0800

    [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates.
    
    This closes #7284.
---
 .../flink/table/codegen/AggregationCodeGenerator.scala  | 17 ++++++++++++++++-
 .../flink/table/runtime/stream/sql/SqlITCase.scala      |  7 ++++---
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 566e3d7..57cc815 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -142,6 +142,21 @@ class AggregationCodeGenerator(
       fields.mkString(", ")
     }
 
+    val parametersCodeForDistinctMerge = aggFields.map { inFields =>
+      val fields = inFields.filter(_ > -1).zipWithIndex.map { case (f, i) =>
+        // index to constant
+        if (f >= physicalInputTypes.length) {
+          constantFields(f - physicalInputTypes.length)
+        }
+        // index to input field
+        else {
+          s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) k.getField($i)"
+        }
+      }
+
+      fields.mkString(", ")
+    }
+
     // get method signatures
     val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
     val constantClasses = UserDefinedFunctionUtils.typeInfoToClass(constantTypes)
@@ -643,7 +658,7 @@ class AggregationCodeGenerator(
                |          (${classOf[Row].getCanonicalName}) entry.getKey();
                |      Long v = (Long) entry.getValue();
                |      if (aDistinctAcc$i.add(k, v)) {
-               |        ${aggs(i)}.accumulate(aAcc$i, k);
+               |        ${aggs(i)}.accumulate(aAcc$i, ${parametersCodeForDistinctMerge(i)});
                |      }
                |    }
                |    a.setField($i, aDistinctAcc$i);
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 46dde8e..ddc2a68 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -78,6 +78,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT c, " +
       "  COUNT(DISTINCT b)," +
+      "  SUM(DISTINCT b)," +
       "  SESSION_END(rowtime, INTERVAL '0.005' SECOND) " +
       "FROM MyTable " +
       "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c "
@@ -87,9 +88,9 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.execute()
 
     val expected = Seq(
-      "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L}
-      "Hello,1,1970-01-01 00:00:00.021",       // window starts at [16L] till {21L}, not merged
-      "Hello,3,1970-01-01 00:00:00.015"        // window starts at [1L,2L],
+      "Hello World,1,9,1970-01-01 00:00:00.014", // window starts at [9L] till {14L}
+      "Hello,1,16,1970-01-01 00:00:00.021",       // window starts at [16L] till {21L}, not merged
+      "Hello,3,6,1970-01-01 00:00:00.015"        // window starts at [1L,2L],
                                                //   merged with [8L,10L], by [4L], till {15L}
     )
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)