You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/01/20 14:17:02 UTC

flink git commit: [FLINK-5386] [table] Refactor window clause.

Repository: flink
Updated Branches:
  refs/heads/master 8ccedd103 -> 6bf556e60


[FLINK-5386] [table] Refactor window clause.

- move window() before groupBy()
- make window alias mandatory
- groupBy() must include window alias

This closes #3046.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bf556e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bf556e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bf556e6

Branch: refs/heads/master
Commit: 6bf556e60148917794f81088fa20c5cc7465823a
Parents: 8ccedd1
Author: Jincheng Sun <su...@gmail.com>
Authored: Thu Dec 15 10:31:33 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jan 20 15:08:25 2017 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  57 ++--
 .../org/apache/flink/table/api/table.scala      |  92 +++++--
 .../org/apache/flink/table/api/windows.scala    | 175 ++++--------
 .../flink/table/plan/logical/groupWindows.scala |  54 ++--
 .../scala/batch/table/FieldProjectionTest.scala |   5 +-
 .../api/scala/batch/table/GroupWindowTest.scala | 113 +++++---
 .../scala/stream/table/AggregationsITCase.scala |  15 +-
 .../scala/stream/table/GroupWindowTest.scala    | 273 ++++++++++++++-----
 .../dataset/DataSetWindowAggregateITCase.scala  |  31 ++-
 9 files changed, 501 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index f2f398c..0efd258 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1022,69 +1022,72 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH
 
 ### Windows
 
-The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals.
+The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
 
-Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table.
+Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. 
+The following example shows how to define a window aggregation on a table.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .window(GroupWindow w)  // define window
-  .select("b.sum")        // aggregate
+  .window([Window w].as("w"))  // define window with alias w
+  .groupBy("w")  // group the table by window w
+  .select("b.sum")  // aggregate
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .window(w: GroupWindow) // define window
-  .select('b.sum)         // aggregate
+  .window([w: Window] as 'w)  // define window with alias w
+  .groupBy('w)   // group the table by window w
+  .select('b.sum)  // aggregate
 {% endhighlight %}
 </div>
 </div>
 
-In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table. 
+In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. 
+The following example shows how to define a window aggregation with additional grouping attributes.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .groupBy("a")
-  .window(GroupWindow w)  // define window
-  .select("a, b.sum")     // aggregate
+  .window([Window w].as("w"))  // define window with alias w
+  .groupBy("w, a")  // group the table by attribute a and window w 
+  .select("a, b.sum")  // aggregate
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .groupBy('a)
-  .window(w: GroupWindow) // define window
-  .select('a, 'b.sum)     // aggregate
+  .window([w: Window] as 'w) // define window with alias w
+  .groupBy('w, 'a)  // group the table by attribute a and window w 
+  .select('a, 'b.sum)  // aggregate
 {% endhighlight %}
 </div>
 </div>
 
-The `GroupWindow` parameter defines how rows are mapped to windows. `GroupWindow` is not an interface that users can implement. Instead, the Table API provides a set of predefined `GroupWindow` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. 
-By assigning the group-window an alias using `as`, properties such as the start and end timestamp of a time window can be accessed in the `select` statement.
+The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .groupBy("a")
-  .window(XXX.as("myWin"))                      // define window alias
-  .select("a, myWin.start, myWin.end, b.count") // aggregate
+  .window([Window w].as("w"))  // define window with alias w
+  .groupBy("w, a")  // group the table by attribute a and window w 
+  .select("a, w.start, w.end, b.count") // aggregate and add window start and end timestamps
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .groupBy('a)
-  .window(XXX as 'myWin)                          // define window alias
-  .select('a, 'myWin.start, 'myWin.end, 'b.count) // aggregate
+  .window([w: Window] as 'w)  // define window with alias w
+  .groupBy('w, 'a)  // group the table by attribute a and window w 
+  .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps
 {% endhighlight %}
 </div>
 </div>
@@ -1117,8 +1120,8 @@ Tumbling windows are defined by using the `Tumble` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Optional.</td>
-      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+      <td>Required.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>
@@ -1184,8 +1187,8 @@ Sliding windows are defined by using the `Slide` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Optional.</td>
-      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+      <td>Required.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>
@@ -1246,8 +1249,8 @@ A session window is defined by using the `Session` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Optional.</td>
-      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+      <td>Required.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 957f4c5..a70b41d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -795,14 +795,19 @@ class Table(
     * For batch tables of finite size, windowing essentially provides shortcuts for time-based
     * groupBy.
     *
-    * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
-    * will be processed by a single operator.
+    * __Note__: Computing windowed aggregates on a streaming table is only a parallel operation
+    * if additional grouping attributes are added to the `groupBy(...)` clause.
+    * If the `groupBy(...)` only references a window alias, the streamed table will be processed
+    * by a single task, i.e., with parallelism 1.
     *
-    * @param groupWindow group-window that specifies how elements are grouped.
+    * @param window window that specifies how elements are grouped.
     * @return A windowed table.
     */
-  def window(groupWindow: GroupWindow): GroupWindowedTable = {
-    new GroupWindowedTable(this, Seq(), groupWindow)
+  def window(window: Window): WindowedTable = {
+    if (window.alias.isEmpty) {
+      throw new ValidationException("An alias must be specified for the window.")
+    }
+    new WindowedTable(this, window)
   }
 }
 
@@ -855,57 +860,96 @@ class GroupedTable(
     val fieldExprs = ExpressionParser.parseExpressionList(fields)
     select(fieldExprs: _*)
   }
+}
+
+class WindowedTable(
+    private[flink] val table: Table,
+    private[flink] val window: Window) {
 
   /**
-    * Groups the records of a table by assigning them to windows defined by a time or row interval.
+    * Groups the elements by a mandatory window and one or more optional grouping attributes.
+    * The window is specified by referring to its alias.
     *
-    * For streaming tables of infinite size, grouping into windows is required to define finite
-    * groups on which group-based aggregates can be computed.
+    * If no additional grouping attribute is specified and if the input is a streaming table,
+    * the aggregation will be performed by a single task, i.e., with parallelism 1.
     *
-    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
-    * groupBy.
+    * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar
+    * to SQL SELECT-GROUP-BY query.
     *
-    * @param groupWindow group-window that specifies how elements are grouped.
-    * @return A windowed table.
+    * Example:
+    *
+    * {{{
+    *   tab.window([window] as 'w)).groupBy('w, 'key).select('key, 'value.avg)
+    * }}}
+    */
+  def groupBy(fields: Expression*): WindowGroupedTable = {
+    val fieldsWithoutWindow = fields.filterNot(window.alias.get.equals(_))
+    if (fields.size != fieldsWithoutWindow.size + 1) {
+      throw new ValidationException("GroupBy must contain exactly one window alias.")
+    }
+
+    new WindowGroupedTable(table, fieldsWithoutWindow, window)
+  }
+
+  /**
+    * Groups the elements by a mandatory window and one or more optional grouping attributes.
+    * The window is specified by referring to its alias.
+    *
+    * If no additional grouping attribute is specified and if the input is a streaming table,
+    * the aggregation will be performed by a single task, i.e., with parallelism 1.
+    *
+    * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar
+    * to SQL SELECT-GROUP-BY query.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.window([window].as("w")).groupBy("w, key").select("key, value.avg")
+    * }}}
     */
-  def window(groupWindow: GroupWindow): GroupWindowedTable = {
-    new GroupWindowedTable(table, groupKey, groupWindow)
+  def groupBy(fields: String): WindowGroupedTable = {
+    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
+    groupBy(fieldsExpr: _*)
   }
+
 }
 
-class GroupWindowedTable(
+class WindowGroupedTable(
     private[flink] val table: Table,
-    private[flink] val groupKey: Seq[Expression],
-    private[flink] val window: GroupWindow) {
+    private[flink] val groupKeys: Seq[Expression],
+    private[flink] val window: Window) {
 
   /**
-    * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
+    * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement.
     * The field expressions can contain complex expressions and aggregations.
     *
     * Example:
     *
     * {{{
-    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
+    *   windowGroupedTable.select('key, 'window.start, 'value.avg as 'valavg)
     * }}}
     */
   def select(fields: Expression*): Table = {
+    // get group keys by removing window alias
+
     val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+
     val projectsOnAgg = replaceAggregationsAndProperties(
       fields, table.tableEnv, aggNames, propNames)
 
     val projectFields = (table.tableEnv, window) match {
       // event time can be arbitrary field in batch environment
       case (_: BatchTableEnvironment, w: EventTimeWindow) =>
-        extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
+        extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField))
       case (_, _) =>
-        extractFieldReferences(fields ++ groupKey)
+        extractFieldReferences(fields ++ groupKeys)
     }
 
     new Table(table.tableEnv,
       Project(
         projectsOnAgg,
         WindowAggregate(
-          groupKey,
+          groupKeys,
           window.toLogicalWindow,
           propNames.map(a => Alias(a._1, a._2)).toSeq,
           aggNames.map(a => Alias(a._1, a._2)).toSeq,
@@ -915,13 +959,13 @@ class GroupWindowedTable(
   }
 
   /**
-    * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
+    * Performs a selection operation on a window grouped  table. Similar to an SQL SELECT statement.
     * The field expressions can contain complex expressions and aggregations.
     *
     * Example:
     *
     * {{{
-    *   groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
+    *   windowGroupedTable.select("key, window.start, value.avg as valavg")
     * }}}
     */
   def select(fields: String): Table = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 7e4498d..5ba6934 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -22,74 +22,73 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.table.plan.logical._
 
 /**
-  * A group-window specification.
+  * A window specification.
   *
-  * Group-windows group rows based on time or row-count intervals and is therefore essentially a
-  * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
-  * on groups of elements.
+  * Window groups rows based on time or row-count intervals. It is a general way to group the
+  * elements, which is very helpful for both groupby-aggregations and over-aggregations to
+  * compute aggregates on groups of elements.
   *
   * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
   * is required to apply aggregations on streaming tables.
   *
-  * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
+  * For finite batch tables, window provides shortcuts for time-based groupBy.
   *
   */
-trait GroupWindow {
+abstract class Window {
 
+  // The expression of alias for this Window
+  private[flink] var alias: Option[Expression] = None
   /**
     * Converts an API class to a logical window for planning.
     */
   private[flink] def toLogicalWindow: LogicalWindow
-}
-
-/**
-  * A group-window operating on event-time.
-  *
-  * @param timeField defines the time mode for streaming tables. For batch table it defines the
-  *                  time attribute on which is grouped.
-  */
-abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {
-
-  protected var name: Option[Expression] = None
 
   /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
+    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+    * refer to. `select()` statement can access window properties such as window start or end time.
     *
     * @param alias alias for this window
     * @return this window
     */
-  def as(alias: Expression): EventTimeWindow = {
-    this.name = Some(alias)
+  def as(alias: Expression): Window = {
+    this.alias = Some(alias)
     this
   }
 
   /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
+    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+    * refer to. `select()` statement can access window properties such as window start or end time.
     *
     * @param alias alias for this window
     * @return this window
     */
-  def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
+  def as(alias: String): Window = as(ExpressionParser.parseExpression(alias))
 }
 
+/**
+  * A window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables. For batch table it defines the
+  *                  time attribute on which is grouped.
+  */
+abstract class EventTimeWindow(val timeField: Expression) extends Window
+
 // ------------------------------------------------------------------------------------------------
-// Tumbling group-windows
+// Tumbling windows
 // ------------------------------------------------------------------------------------------------
 
 /**
-  * Tumbling group-window.
+  * Tumbling window.
   *
   * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
   * grouped by processing-time.
   *
   * @param size the size of the window either as time or row-count interval.
   */
-class TumblingWindow(size: Expression) extends GroupWindow {
+class TumblingWindow(size: Expression) extends Window {
 
   /**
-    * Tumbling group-window.
+    * Tumbling window.
     *
     * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
     * are grouped by processing-time.
@@ -98,8 +97,6 @@ class TumblingWindow(size: Expression) extends GroupWindow {
     */
   def this(size: String) = this(ExpressionParser.parseExpression(size))
 
-  private var alias: Option[Expression] = None
-
   /**
     * Specifies the time attribute on which rows are grouped.
     *
@@ -109,10 +106,10 @@ class TumblingWindow(size: Expression) extends GroupWindow {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a tumbling group-window on event-time
+    * @return a tumbling window on event-time
     */
   def on(timeField: Expression): TumblingEventTimeWindow =
-    new TumblingEventTimeWindow(alias, timeField, size)
+    new TumblingEventTimeWindow(timeField, size)
 
   /**
     * Specifies the time attribute on which rows are grouped.
@@ -123,51 +120,29 @@ class TumblingWindow(size: Expression) extends GroupWindow {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a tumbling group-window on event-time
+    * @return a tumbling window on event-time
     */
   def on(timeField: String): TumblingEventTimeWindow =
     on(ExpressionParser.parseExpression(timeField))
 
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): TumblingWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
-
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeTumblingGroupWindow(alias, size)
 }
 
 /**
-  * Tumbling group-window on event-time.
+  * Tumbling window on event-time.
   */
 class TumblingEventTimeWindow(
-    alias: Option[Expression],
     time: Expression,
     size: Expression)
   extends EventTimeWindow(time) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
+    EventTimeTumblingGroupWindow(alias, time, size)
 }
 
 // ------------------------------------------------------------------------------------------------
-// Sliding group windows
+// Sliding windows
 // ------------------------------------------------------------------------------------------------
 
 /**
@@ -195,7 +170,7 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count interval.
-    * @return a sliding group-window
+    * @return a sliding window
     */
   def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
 
@@ -210,13 +185,13 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count interval.
-    * @return a sliding group-window
+    * @return a sliding window
     */
   def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
 }
 
 /**
-  * Sliding group-window.
+  * Sliding window.
   *
   * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
   * grouped by processing-time.
@@ -226,9 +201,7 @@ class SlideWithSize(size: Expression) {
 class SlidingWindow(
     size: Expression,
     slide: Expression)
-  extends GroupWindow {
-
-  private var alias: Option[Expression] = None
+  extends Window {
 
   /**
     * Specifies the time attribute on which rows are grouped.
@@ -239,10 +212,10 @@ class SlidingWindow(
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a sliding group-window on event-time
+    * @return a sliding window on event-time
     */
   def on(timeField: Expression): SlidingEventTimeWindow =
-    new SlidingEventTimeWindow(alias, timeField, size, slide)
+    new SlidingEventTimeWindow(timeField, size, slide)
 
   /**
     * Specifies the time attribute on which rows are grouped.
@@ -253,66 +226,44 @@ class SlidingWindow(
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a sliding group-window on event-time
+    * @return a sliding window on event-time
     */
   def on(timeField: String): SlidingEventTimeWindow =
     on(ExpressionParser.parseExpression(timeField))
 
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SlidingWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
-
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeSlidingGroupWindow(alias, size, slide)
 }
 
 /**
-  * Sliding group-window on event-time.
+  * Sliding window on event-time.
   */
 class SlidingEventTimeWindow(
-    alias: Option[Expression],
     timeField: Expression,
     size: Expression,
     slide: Expression)
   extends EventTimeWindow(timeField) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
+    EventTimeSlidingGroupWindow(alias, timeField, size, slide)
 }
 
 // ------------------------------------------------------------------------------------------------
-// Session group windows
+// Session windows
 // ------------------------------------------------------------------------------------------------
 
 /**
-  * Session group-window.
+  * Session window.
   *
   * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
   * grouped by processing-time.
   *
   * @param gap the time interval of inactivity before a window is closed.
   */
-class SessionWindow(gap: Expression) extends GroupWindow {
+class SessionWindow(gap: Expression) extends Window {
 
   /**
-    * Session group-window.
+    * Session window.
     *
     * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
     * are grouped by processing-time.
@@ -321,8 +272,6 @@ class SessionWindow(gap: Expression) extends GroupWindow {
     */
   def this(gap: String) = this(ExpressionParser.parseExpression(gap))
 
-  private var alias: Option[Expression] = None
-
   /**
     * Specifies the time attribute on which rows are grouped.
     *
@@ -332,10 +281,10 @@ class SessionWindow(gap: Expression) extends GroupWindow {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a session group-window on event-time
+    * @return a session window on event-time
     */
   def on(timeField: Expression): SessionEventTimeWindow =
-    new SessionEventTimeWindow(alias, timeField, gap)
+    new SessionEventTimeWindow(timeField, gap)
 
   /**
     * Specifies the time attribute on which rows are grouped.
@@ -346,45 +295,23 @@ class SessionWindow(gap: Expression) extends GroupWindow {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a session group-window on event-time
+    * @return a session window on event-time
     */
   def on(timeField: String): SessionEventTimeWindow =
     on(ExpressionParser.parseExpression(timeField))
 
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SessionWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
-
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeSessionGroupWindow(alias, gap)
 }
 
 /**
-  * Session group-window on event-time.
+  * Session window on event-time.
   */
 class SessionEventTimeWindow(
-    alias: Option[Expression],
     timeField: Expression,
     gap: Expression)
   extends EventTimeWindow(timeField) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
+    EventTimeSessionGroupWindow(alias, timeField, gap)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 0bf149c..f1f058e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -25,9 +25,9 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeIn
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 abstract class EventTimeGroupWindow(
-    name: Option[Expression],
+    alias: Option[Expression],
     time: Expression)
-  extends LogicalWindow(name) {
+  extends LogicalWindow(alias) {
 
   override def validate(tableEnv: TableEnvironment): ValidationResult = {
     val valid = super.validate(tableEnv)
@@ -55,7 +55,7 @@ abstract class EventTimeGroupWindow(
   }
 }
 
-abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name) {
+abstract class ProcessingTimeGroupWindow(alias: Option[Expression]) extends LogicalWindow(alias) {
   override def validate(tableEnv: TableEnvironment): ValidationResult = {
     val valid = super.validate(tableEnv)
     if (valid.isFailure) {
@@ -88,32 +88,32 @@ object TumblingGroupWindow {
 }
 
 case class ProcessingTimeTumblingGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     size: Expression)
-  extends ProcessingTimeGroupWindow(name) {
+  extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     ProcessingTimeTumblingGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(size))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
     super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size))
 
-  override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)"
+  override def toString: String = s"ProcessingTimeTumblingGroupWindow($alias, $size)"
 }
 
 case class EventTimeTumblingGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     timeField: Expression,
     size: Expression)
   extends EventTimeGroupWindow(
-    name,
+    alias,
     timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     EventTimeTumblingGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(timeField),
       resolve(size))
 
@@ -130,7 +130,7 @@ case class EventTimeTumblingGroupWindow(
           ValidationSuccess
       })
 
-  override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)"
+  override def toString: String = s"EventTimeTumblingGroupWindow($alias, $timeField, $size)"
 }
 
 // ------------------------------------------------------------------------------------------------
@@ -177,33 +177,33 @@ object SlidingGroupWindow {
 }
 
 case class ProcessingTimeSlidingGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     size: Expression,
     slide: Expression)
-  extends ProcessingTimeGroupWindow(name) {
+  extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     ProcessingTimeSlidingGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(size),
       resolve(slide))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
     super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
 
-  override def toString: String = s"ProcessingTimeSlidingGroupWindow($name, $size, $slide)"
+  override def toString: String = s"ProcessingTimeSlidingGroupWindow($alias, $size, $slide)"
 }
 
 case class EventTimeSlidingGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     timeField: Expression,
     size: Expression,
     slide: Expression)
-  extends EventTimeGroupWindow(name, timeField) {
+  extends EventTimeGroupWindow(alias, timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     EventTimeSlidingGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(timeField),
       resolve(size),
       resolve(slide))
@@ -221,7 +221,7 @@ case class EventTimeSlidingGroupWindow(
           ValidationSuccess
       })
 
-  override def toString: String = s"EventTimeSlidingGroupWindow($name, $timeField, $size, $slide)"
+  override def toString: String = s"EventTimeSlidingGroupWindow($alias, $timeField, $size, $slide)"
 }
 
 // ------------------------------------------------------------------------------------------------
@@ -240,37 +240,37 @@ object SessionGroupWindow {
 }
 
 case class ProcessingTimeSessionGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     gap: Expression)
-  extends ProcessingTimeGroupWindow(name) {
+  extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     ProcessingTimeSessionGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(gap))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
     super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
 
-  override def toString: String = s"ProcessingTimeSessionGroupWindow($name, $gap)"
+  override def toString: String = s"ProcessingTimeSessionGroupWindow($alias, $gap)"
 }
 
 case class EventTimeSessionGroupWindow(
-    name: Option[Expression],
+    override val alias: Option[Expression],
     timeField: Expression,
     gap: Expression)
   extends EventTimeGroupWindow(
-    name,
+    alias,
     timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
     EventTimeSessionGroupWindow(
-      name.map(resolve),
+      alias.map(resolve),
       resolve(timeField),
       resolve(gap))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
     super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
 
-  override def toString: String = s"EventTimeSessionGroupWindow($name, $timeField, $gap)"
+  override def toString: String = s"EventTimeSessionGroupWindow($alias, $timeField, $gap)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 708e7f1..a7da5b5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -228,6 +228,7 @@ class FieldProjectionTest extends TableTestBase {
     val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable
         .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w)
         .select(Upper('c).count, 'a.sum)
 
     val expected =
@@ -253,8 +254,8 @@ class FieldProjectionTest extends TableTestBase {
   def testSelectFromStreamingGroupedWindow(): Unit = {
     val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable
-        .groupBy('b)
         .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w, 'b)
         .select(Upper('c).count, 'a.sum, 'b)
 
     val expected = unaryNode(
@@ -287,7 +288,6 @@ class FieldProjectionTest extends TableTestBase {
       .groupBy('word)
       .select('word, 'frequency.sum as 'frequency)
       .filter('frequency === 2)
-
     val expected =
       unaryNode(
         "DataSetCalc",
@@ -303,6 +303,7 @@ class FieldProjectionTest extends TableTestBase {
 
     util.verifyTable(resultTable, expected)
   }
+
 }
 
 object FieldProjectionTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index 73ae095..a778b40 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.WindowReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
@@ -28,6 +29,35 @@ import org.junit.Test
 class GroupWindowTest extends TableTestBase {
 
   //===============================================================================================
+  // Common test
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupByWithoutWindowAlias(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowTimeRef(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
+      .groupBy('w2)
+      .select('string)
+  }
+
+  //===============================================================================================
   // Tumbling Windows
   //===============================================================================================
 
@@ -37,8 +67,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Tumble over 50.milli)   // require a time attribute 
+      .window(Tumble over 50.milli as 'w)   // require a time attribute
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -48,8 +78,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Tumble over 2.rows)   // require a time attribute
+      .window(Tumble over 2.rows as 'w)   // require a time attribute
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -59,15 +89,15 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows on 'long)
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, 'long, 2.rows)),
+      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -80,15 +110,15 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.milli on 'long)
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, 'long, 5.milli)),
+      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -101,7 +131,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .window(Tumble over 50.milli)   // require a time attribute
+      .window(Tumble over 50.milli as 'w) // require a time attribute
+      .groupBy('w)
       .select('string, 'int.count)
   }
 
@@ -111,7 +142,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .window(Tumble over 2.rows)   // require a time attribute
+      .window(Tumble over 2.rows as 'w) // require a time attribute
+      .groupBy('w)
       .select('int.count)
   }
 
@@ -121,7 +153,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 5.milli on 'long)
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -131,7 +164,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(None, 'long, 5.milli)),
+      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -144,7 +177,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows on 'long)
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -154,7 +188,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(None, 'long, 2.rows)),
+      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -171,8 +205,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Slide over 50.milli every 50.milli) // require on a time attribute
+      .window(Slide over 50.milli every 50.milli as 'w) // require on a time attribute
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -182,8 +216,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Slide over 10.rows every 5.rows) // require on a time attribute
+      .window(Slide over 10.rows every 5.rows as 'w) // require on a time attribute
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -193,15 +227,16 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 8.milli every 10.milli on 'long)
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, 'long, 8.milli, 10.milli)),
+      term("window",
+        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 8.milli, 10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -214,15 +249,16 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows on 'long)
+      .window(Slide over 2.rows every 1.rows on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, 'long, 2.rows, 1.rows)),
+      term("window",
+        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -235,7 +271,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .window(Slide over 2.rows every 1.rows)   // require on a time attribute
+      .window(Slide over 2.rows every 1.rows as 'w) // require on a time attribute
+      .groupBy('w)
       .select('int.count)
   }
 
@@ -245,7 +282,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long)
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -255,7 +293,8 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeSlidingGroupWindow(None, 'long, 8.milli, 10.milli)),
+      term("window",
+        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 8.milli, 10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -268,7 +307,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'long)
+      .window(Slide over 2.rows every 1.rows on 'long as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -278,7 +318,8 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeSlidingGroupWindow(None, 'long, 2.rows, 1.rows)),
+      term("window",
+        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -295,15 +336,15 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli on 'long)
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(None, 'long, 7.milli)),
+      term("window", EventTimeSessionGroupWindow(Some(WindowReference("w")), 'long, 7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -316,8 +357,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli) // require on a time attribute
+      .window(Session withGap 7.milli as 'w) // require on a time attribute
+      .groupBy('string, 'w)
       .select('string, 'int.count)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index f41cae1..a243db7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -58,8 +58,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows)
+      .window(Slide over 2.rows every 1.rows as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count, 'int.avg)
 
     val results = windowedTable.toDataStream[Row]
@@ -83,8 +83,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli on 'rowtime)
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val results = windowedTable.toDataStream[Row]
@@ -106,7 +106,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows)
+      .window(Tumble over 2.rows as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val results = windowedTable.toDataStream[Row]
@@ -130,8 +131,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
 
     val results = windowedTable.toDataStream[Row]
@@ -159,8 +160,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
 
     val results = windowedTable.toDataStream[Row]

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index cbd814a..8708649 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -70,20 +70,45 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
+    // only rowtime is a valid time attribute in a stream environment
+      .window(Tumble over 50.milli on 'string as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupByWithoutWindowAlias(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('string)
-      // only rowtime is a valid time attribute in a stream environment
-      .window(Tumble over 50.milli on 'string)
       .select('string, 'int.count)
   }
 
   @Test(expected = classOf[ValidationException])
+  def testInvalidRowTimeRef(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
+      .groupBy('w2)
+      .select('string)
+  }
+
+  @Test(expected = classOf[ValidationException])
   def testInvalidTumblingSize(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Tumble over "WRONG") // string is not a valid interval
+      .window(Tumble over "WRONG" as 'w) // string is not a valid interval
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -93,8 +118,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Slide over "WRONG" every "WRONG") // string is not a valid interval
+      .window(Slide over "WRONG" every "WRONG" as 'w) // string is not a valid interval
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -104,8 +129,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed
+      .window(Slide over 12.rows every 1.minute as 'w) // row and time intervals may not be mixed
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -115,8 +140,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
-      .window(Session withGap 10.rows) // row interval is not valid for session windows
+      .window(Session withGap 10.rows as 'w) // row interval is not valid for session windows
+      .groupBy('w, 'string)
       .select('string, 'int.count)
   }
 
@@ -126,8 +151,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
       .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol
+      .groupBy('string)
       .select('string, 'int.count)
   }
 
@@ -137,19 +162,63 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     table
-      .groupBy('string)
       .window(Session withGap 100.milli as 'string) // field name "string" is already present
+      .groupBy('string)
       .select('string, 'int.count)
   }
 
   @Test
+  def testMultiWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli as 'w1)
+      .groupBy('w1, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 20.milli every 10.milli as 'w2)
+      .groupBy('w2)
+      .select('string.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "string", "int")
+          ),
+          term("groupBy", "string"),
+          term(
+            "window",
+            ProcessingTimeTumblingGroupWindow(
+              Some(WindowReference("w1")),
+              50.milli)),
+          term("select", "string", "COUNT(int) AS TMP_0")
+        ),
+        term("select", "string")
+      ),
+      term(
+        "window",
+        ProcessingTimeSlidingGroupWindow(
+          Some(WindowReference("w2")),
+          20.milli, 10.milli)),
+      term("select", "COUNT(string) AS TMP_2")
+    )
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
   def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 50.milli)
+      .window(Tumble over 50.milli as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -160,7 +229,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term(
+        "window",
+        ProcessingTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -173,8 +246,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows)
+      .window(Tumble over 2.rows as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -185,7 +258,10 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term(
+        "window",
+        ProcessingTimeTumblingGroupWindow(
+          Some(WindowReference("w")), 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -198,8 +274,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.milli on 'rowtime)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -210,7 +286,12 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+      term(
+        "window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(),
+          5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -224,15 +305,19 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows on 'rowtime)
+      .window(Tumble over 2.rows on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataStreamAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+      term(
+        "window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -245,8 +330,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 50.milli every 50.milli)
+      .window(Slide over 50.milli every 50.milli as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -257,7 +342,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+      term(
+        "window",
+        ProcessingTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          50.milli, 50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -270,8 +359,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows)
+      .window(Slide over 2.rows every 1.rows as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -282,7 +371,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term(
+        "window",
+        ProcessingTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -295,8 +388,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -307,7 +400,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+      term(
+        "window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 8.milli, 10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -321,15 +418,19 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .window(Slide over 2.rows every 1.rows on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataStreamAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+      term(
+        "window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -342,8 +443,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli on 'rowtime)
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -354,7 +455,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+      term(
+        "window",
+        EventTimeSessionGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -367,8 +472,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 50.milli)
+      .window(Tumble over 50.milli as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
@@ -379,7 +484,11 @@ class GroupWindowTest extends TableTestBase {
         term("select", "string", "int")
       ),
       term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term(
+        "window",
+        ProcessingTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -392,7 +501,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows)
+      .window(Tumble over 2.rows as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -402,7 +512,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term(
+        "window",
+        ProcessingTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -415,7 +529,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -425,7 +540,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+      term(
+        "window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -439,7 +558,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows on 'rowtime)
+      .window(Tumble over 2.rows on 'rowtime as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -449,7 +569,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+      term(
+        "window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -463,7 +587,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 50.milli every 50.milli)
+      .window(Slide over 50.milli every 50.milli as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -473,7 +598,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+      term(
+        "window",
+        ProcessingTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          50.milli, 50.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -486,7 +615,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows)
+      .window(Slide over 2.rows every 1.rows as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -496,7 +626,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term(
+        "window",
+        ProcessingTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -509,7 +643,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -519,7 +654,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+      term(
+        "window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 8.milli, 10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -533,7 +672,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .window(Slide over 2.rows every 1.rows on 'rowtime as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -543,7 +683,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+      term(
+        "window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -556,7 +700,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime)
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w)
       .select('int.count)
 
     val expected = unaryNode(
@@ -566,7 +711,11 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int")
       ),
-      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+      term(
+        "window",
+        EventTimeSessionGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(), 7.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -579,8 +728,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
@@ -612,8 +761,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
@@ -646,8 +795,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Session withGap 3.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
 
     val expected = unaryNode(
@@ -683,8 +832,8 @@ class GroupWindowTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
         'w.end as 'x3, 'w.end)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index aa27d80..f13f350 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.types.Row
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import org.apache.flink.table.api.ValidationException
 
 import scala.collection.JavaConverters._
 
@@ -53,7 +54,8 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
 
     // Count tumbling non-grouping window on event-time are currently not supported
     table
-      .window(Tumble over 2.rows on 'long)
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w)
       .select('int.count)
       .toDataSet[Row]
   }
@@ -66,8 +68,8 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows on 'long)
+      .window(Tumble over 2.rows on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.sum)
 
     val expected = "Hello,7\n" + "Hello world,7\n"
@@ -83,8 +85,8 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .groupBy('string)
       .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
       .select('string, 'int.sum, 'w.start, 'w.end)
 
     val expected = "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
@@ -107,6 +109,7 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
 
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w)
       .select('int.sum, 'w.start, 'w.end)
 
     val expected = "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
@@ -124,8 +127,8 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
 
     val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
     val windowedTable = table
-      .groupBy('string)
       .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('string, 'w)
       .select('string, 'string.count, 'w.start, 'w.end)
 
     val results = windowedTable.toDataSet[Row].collect()
@@ -146,6 +149,24 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
     val windowedTable =table
       .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w)
       .select('string.count).toDataSet[Row].collect()
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testMultiGroupWindow(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window( Slide over 5.milli every 1.milli on 'int as 'w2)
+      .groupBy('w2)
+      .select('string)
+      .toDataSet[Row]
+  }
+
 }