You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/26 21:11:26 UTC

[4/5] flink git commit: [FLINK-4691] [table] Add group-windows for streaming tables to Table API.

[FLINK-4691] [table] Add group-windows for streaming tables to Table API.

This closes #2562.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f3977e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f3977e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f3977e

Branch: refs/heads/master
Commit: 44f3977e7fee44a3601b651a523c4ab37ed5a15d
Parents: baf057a
Author: twalthr <tw...@apache.org>
Authored: Thu Aug 25 09:19:53 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 26 23:03:37 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 444 +++++++++++++-
 .../flink/api/java/table/groupWindows.scala     |  84 +++
 .../flink/api/scala/table/expressionDsl.scala   |  30 +-
 .../flink/api/scala/table/groupWindows.scala    |  85 +++
 .../flink/api/table/BatchTableEnvironment.scala |   4 +-
 .../flink/api/table/FlinkRelBuilder.scala       |  34 +-
 .../flink/api/table/FlinkTypeFactory.scala      |  10 +-
 .../api/table/StreamTableEnvironment.scala      |  45 +-
 .../flink/api/table/TableEnvironment.scala      |   8 +-
 .../org/apache/flink/api/table/Types.scala      |   6 +-
 .../flink/api/table/codegen/CodeGenUtils.scala  |  10 +-
 .../table/codegen/calls/ScalarOperators.scala   |  28 +-
 .../org/apache/flink/api/table/exceptions.scala |  13 +-
 .../api/table/expressions/Expression.scala      |   4 +-
 .../table/expressions/ExpressionParser.scala    |  36 +-
 .../api/table/expressions/ExpressionUtils.scala |  21 +-
 .../api/table/expressions/InputTypeSpec.scala   |   2 +-
 .../api/table/expressions/aggregations.scala    |   9 +-
 .../api/table/expressions/arithmetic.scala      |   8 +-
 .../flink/api/table/expressions/call.scala      |   6 +-
 .../flink/api/table/expressions/cast.scala      |   2 +-
 .../api/table/expressions/comparison.scala      |   6 +-
 .../api/table/expressions/fieldExpression.scala |  51 +-
 .../flink/api/table/expressions/literals.scala  |  17 +-
 .../flink/api/table/expressions/logic.scala     |   6 +-
 .../api/table/expressions/mathExpressions.scala |   6 +-
 .../flink/api/table/expressions/ordering.scala  |   2 +-
 .../table/expressions/stringExpressions.scala   |  12 +-
 .../flink/api/table/expressions/time.scala      |  18 +-
 .../table/expressions/windowProperties.scala    |  57 ++
 .../api/table/plan/ProjectionTranslator.scala   | 105 ++++
 .../api/table/plan/RexNodeTranslator.scala      |  88 ---
 .../api/table/plan/logical/LogicalNode.scala    |  14 +-
 .../api/table/plan/logical/LogicalWindow.scala  |  36 ++
 .../api/table/plan/logical/Resolvable.scala     |  43 ++
 .../api/table/plan/logical/groupWindows.scala   | 258 ++++++++
 .../api/table/plan/logical/operators.scala      | 144 ++++-
 .../logical/rel/LogicalWindowAggregate.scala    | 114 ++++
 .../api/table/plan/nodes/FlinkAggregate.scala   |  68 ++
 .../flink/api/table/plan/nodes/FlinkRel.scala   |  43 ++
 .../plan/nodes/dataset/DataSetAggregate.scala   |  60 +-
 .../table/plan/nodes/dataset/DataSetRel.scala   |  36 --
 .../nodes/datastream/DataStreamAggregate.scala  | 300 +++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../rules/dataSet/DataSetAggregateRule.scala    |   4 +-
 .../datastream/DataStreamAggregateRule.scala    |  78 +++
 .../aggregate/AggregateAllWindowFunction.scala  |  53 ++
 .../AggregateReduceCombineFunction.scala        | 122 ++++
 .../AggregateReduceGroupFunction.scala          | 108 +---
 .../table/runtime/aggregate/AggregateUtil.scala |  36 +-
 .../aggregate/AggregateWindowFunction.scala     |  59 ++
 .../runtime/aggregate/PropertyCollector.scala   |  42 ++
 .../table/runtime/aggregate/WindowEndRead.scala |  38 ++
 .../runtime/aggregate/WindowPropertyRead.scala  |  33 +
 .../runtime/aggregate/WindowStartRead.scala     |  38 ++
 .../org/apache/flink/api/table/table.scala      | 132 +++-
 .../api/table/typeutils/InternalTypeInfo.scala  |  81 +++
 .../api/table/typeutils/IntervalTypeInfo.scala  | 109 ----
 .../table/typeutils/RowIntervalTypeInfo.scala   |  36 ++
 .../table/typeutils/TimeIntervalTypeInfo.scala  | 113 ++++
 .../api/table/typeutils/TypeCheckUtils.scala    |   8 +-
 .../api/table/typeutils/TypeCoercion.scala      |  12 +-
 .../table/validate/ExprValidationResult.scala   |  41 --
 .../api/table/validate/ValidationResult.scala   |  53 ++
 .../org/apache/flink/api/table/windows.scala    | 390 ++++++++++++
 .../api/java/batch/TableEnvironmentITCase.java  |   6 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   4 +-
 .../api/scala/batch/table/CalcITCase.scala      |   6 +-
 .../scala/stream/table/AggregationsITCase.scala | 198 ++++++
 .../scala/stream/table/GroupWindowTest.scala    | 614 +++++++++++++++++++
 .../scala/stream/table/UnsupportedOpsTest.scala |   8 -
 .../flink/api/table/utils/TableTestBase.scala   |  25 +-
 72 files changed, 4198 insertions(+), 623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 57a2b5c..6da2807 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -965,7 +965,9 @@ composite = suffixed | atom ;
 
 suffixed = interval | cast | as | aggregation | if | functionCall ;
 
-interval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
+timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
+
+rowInterval = composite , "." , "rows" ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
@@ -973,7 +975,7 @@ dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" |
 
 as = composite , ".as(" , fieldReference , ")" ;
 
-aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" ) , [ "()" ] ;
+aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" ) , [ "()" ] ;
 
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
@@ -992,7 +994,7 @@ timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUART
 {% endhighlight %}
 
 Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The
-column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
+column names and function names follow Java identifier syntax. The column name `rowtime` is a reserved logical attribute in streaming environments. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
 
 If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`.
 
@@ -1002,6 +1004,266 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH
 
 {% top %}
 
+### Windows
+
+The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals.
+
+Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+val table = input
+  .window(w: GroupWindow) // define window
+  .select("b.sum")        // aggregate
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val table = input
+  .window(w: GroupWindow) // define window
+  .select('b.sum)         // aggregate
+{% endhighlight %}
+</div>
+</div>
+
+In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+val table = input
+  .groupBy("a")
+  .window(w: GroupWindow) // define window
+  .select("a, b.sum")     // aggregate
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val table = input
+  .groupBy('a)
+  .window(w: GroupWindow) // define window
+  .select('a, 'b.sum)     // aggregate
+{% endhighlight %}
+</div>
+</div>
+
+The `GroupWindow` parameter defines how rows are mapped to windows. `GroupWindow` is not an interface that users can implement. Instead, the Table API provides a set of predefined `GroupWindow` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. 
+By assigning the group-window an alias using `as`, properties such as the start and end timestamp of a time window can be accessed in the `select` statement.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+val table = input
+  .groupBy("a")
+  .window(XXX.as("myWin"))                      // define window alias
+  .select("a, myWin.start, myWin.end, b.count") // aggregate
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val table = input
+  .groupBy('a)
+  .window(XXX as 'myWin)                          // define window alias
+  .select('a, 'myWin.start, 'myWin.end, 'b.count) // aggregate
+{% endhighlight %}
+</div>
+</div>
+
+#### Tumble (Tumbling Windows)
+
+A tumbling window assigns rows to non-overlapping, continuous windows of fixed length. For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time, processing-time, or on a row-count.
+
+Tumbling windows are defined by using the `Tumble` class as follows:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Required?</th>
+      <th class="text-left">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><code>over</code></td>
+      <td>Required.</td>
+      <td>Defines the length the window, either as time or row-count interval.</td>
+    </tr>
+    <tr>
+      <td><code>on</code></td>
+      <td>Required for streaming event-time windows and windows on batch tables.</td>
+      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped.</td>
+    </tr>
+    <tr>
+      <td><code>as</code></td>
+      <td>Optional.</td>
+      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+    </tr>
+  </tbody>
+</table>
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Tumbling Event-time Window
+.window(Tumble.over("10.minutes").on("rowtime").as("w"))
+
+// Tumbling Processing-time Window
+.window(Tumble.over("10.minutes").as("w"))
+
+// Tumbling Row-count Window
+.window(Tumble.over("10.rows").as("w"))
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Tumbling Event-time Window
+.window(Tumble over 10.minutes on 'rowtime as 'w)
+
+// Tumbling Processing-time Window
+.window(Tumble over 10.minutes as 'w)
+
+// Tumbling Row-count Window
+.window(Tumble over 10.rows as 'w)
+{% endhighlight %}
+</div>
+</div>
+
+#### Slide (Sliding Windows)
+
+A sliding window has a fixed size and slides by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a sliding window of 15 minutes size and 5 minute slide interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Sliding windows can be defined on event-time, processing-time, or on a row-count.
+
+Sliding windows are defined by using the `Slide` class as follows:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Required?</th>
+      <th class="text-left">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><code>over</code></td>
+      <td>Required.</td>
+      <td>Defines the length of the window, either as time or row-count interval.</td>
+    </tr>
+    <tr>
+      <td><code>every</code></td>
+      <td>Required.</td>
+      <td>Defines the slide interval, either as time or row-count interval. The slide interval must be of the same type as the size interval.</td>
+    </tr>
+    <tr>
+      <td><code>on</code></td>
+      <td>Required for event-time windows and windows on batch tables.</td>
+      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped</td>
+    </tr>
+    <tr>
+      <td><code>as</code></td>
+      <td>Optional.</td>
+      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+    </tr>
+  </tbody>
+</table>
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Sliding Event-time Window
+.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
+
+// Sliding Processing-time window
+.window(Slide.over("10.minutes").every("5.minutes").as("w"))
+
+// Sliding Row-count window
+.window(Slide.over("10.rows").every("5.rows").as("w"))
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Sliding Event-time Window
+.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
+
+// Sliding Processing-time window
+.window(Slide over 10.minutes every 5.minutes as 'w)
+
+// Sliding Row-count window
+.window(Slide over 10.rows every 5.rows as 'w)
+{% endhighlight %}
+</div>
+</div>
+
+#### Session (Session Windows)
+
+Session windows do not have a fixed size but their bounds are defined by an interval of inactivity, i.e., a session window is closes if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time or processing-time.
+
+A session window is defined by using the `Session` class as follows:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Required?</th>
+      <th class="text-left">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><code>withGap</code></td>
+      <td>Required.</td>
+      <td>Defines the gap between two windows as time interval.</td>
+    </tr>
+    <tr>
+      <td><code>on</code></td>
+      <td>Required for event-time windows and windows on batch tables.</td>
+      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped</td>
+    </tr>
+    <tr>
+      <td><code>as</code></td>
+      <td>Optional.</td>
+      <td>Assigns an alias to the window that can be used in the following <code>select()</code> clause to access window properties such as window start or end time.</td>
+    </tr>
+  </tbody>
+</table>
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Session Event-time Window
+.window(Session.withGap("10.minutes").on("rowtime").as("w"))
+
+// Session Processing-time Window
+.window(Session.withGap("10.minutes").as("w"))
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Session Event-time Window
+.window(Session withGap 10.minutes on 'rowtime as 'w)
+
+// Session Processing-time Window
+.window(Session withGap 10.minutes as 'w)
+{% endhighlight %}
+</div>
+</div>
+
+#### Limitations
+
+Currently the following features are not supported yet:
+
+- Row-count windows on event-time
+- Windows on batch tables
 
 SQL
 ----
@@ -1549,6 +1811,83 @@ STRING.toTimestamp()
     <tr>
       <td>
         {% highlight java %}
+NUMERIC.year
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of months for a given number of years.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.month
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of months for a given number of months.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.day
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of days.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.hour
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of hours.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.minute
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of minutes.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.second
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of seconds.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.milli
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
 TEMPORAL.extract(TIMEINTERVALUNIT)
 {% endhighlight %}
       </td>
@@ -1656,6 +1995,17 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+NUMERIC.rows
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of rows.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -1989,6 +2339,83 @@ STRING.toTimestamp
     <tr>
       <td>
         {% highlight scala %}
+NUMERIC.year
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of months for a given number of years.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.month
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of months for a given number of months.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.day
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of days.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.hour
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of hours.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.minute
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of minutes.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.second
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds for a given number of seconds.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.milli
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of milliseconds.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 TEMPORAL.extract(TimeIntervalUnit)
 {% endhighlight %}
       </td>
@@ -2096,6 +2523,17 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+NUMERIC.rows
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates an interval of rows.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
new file mode 100644
index 0000000..3bbe753
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+  * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
+    * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: String): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
+  * a specified slide interval. If the slide interval is smaller than the window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+    * consecutive window evaluations.
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: String): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper class for creating a session window. The boundary of session windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 5df9175..836db3e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -21,7 +21,7 @@ import java.sql.{Date, Time, Timestamp}
 
 import org.apache.calcite.avatica.util.DateTimeUtils._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
+import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.api.table.expressions._
 
@@ -103,6 +103,16 @@ trait ImplicitExpressionOperations {
   def desc = Desc(expr)
 
   /**
+    * Returns the start time of a window when applied on a window reference.
+    */
+  def start = WindowStart(expr)
+
+  /**
+    * Returns the end time of a window when applied on a window reference.
+    */
+  def end = WindowEnd(expr)
+
+  /**
     * Ternary conditional operator that decides which of two other expressions should be evaluated
     * based on a evaluated boolean condition.
     *
@@ -356,7 +366,7 @@ trait ImplicitExpressionOperations {
     */
   def days = day
 
-    /**
+  /**
     * Creates an interval of the given number of hours.
     *
     * @return interval of milliseconds
@@ -370,7 +380,7 @@ trait ImplicitExpressionOperations {
     */
   def hours = hour
 
-    /**
+  /**
     * Creates an interval of the given number of minutes.
     *
     * @return interval of milliseconds
@@ -384,7 +394,7 @@ trait ImplicitExpressionOperations {
     */
   def minutes = minute
 
-    /**
+  /**
     * Creates an interval of the given number of seconds.
     *
     * @return interval of milliseconds
@@ -398,7 +408,7 @@ trait ImplicitExpressionOperations {
     */
   def seconds = second
 
-    /**
+  /**
     * Creates an interval of the given number of milliseconds.
     *
     * @return interval of milliseconds
@@ -411,6 +421,16 @@ trait ImplicitExpressionOperations {
     * @return interval of milliseconds
     */
   def millis = milli
+
+  // row interval type
+
+  /**
+    * Creates an interval of rows.
+    *
+    * @return interval of rows
+    */
+  def rows = toRowInterval(expr)
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
new file mode 100644
index 0000000..16fda5b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
+    * windows. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
+  * a specified slide interval. If the slide interval is smaller than the window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+    * consecutive
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper object for creating a session window. The boundary of session windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 1d34777..24b385c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -73,7 +73,7 @@ abstract class BatchTableEnvironment(
     val m = internalNamePattern.findFirstIn(name)
     m match {
       case Some(_) =>
-        throw new ValidationException(s"Illegal Table name. " +
+        throw new TableException(s"Illegal Table name. " +
           s"Please choose a name that does not contain the pattern $internalNamePattern")
       case None =>
     }
@@ -96,7 +96,7 @@ abstract class BatchTableEnvironment(
     if (isRegistered(tableName)) {
       new Table(this, CatalogNode(tableName, getRowType(tableName)))
     } else {
-      throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
+      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index 1215806..ea4eed0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -20,12 +20,20 @@ package org.apache.flink.api.table
 
 import java.util.Collections
 
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+import java.lang.Iterable
+
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
-import org.apache.calcite.plan.volcano.VolcanoPlanner
 import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.logical.LogicalAggregate
 import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
 import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.expressions.WindowProperty
+import org.apache.flink.api.table.plan.logical.LogicalWindow
+import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
 
 /**
   * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
@@ -41,10 +49,25 @@ class FlinkRelBuilder(
 
   def getPlanner: RelOptPlanner = cluster.getPlanner
 
-  def getCluster = cluster
+  def getCluster: RelOptCluster = relOptCluster
 
   override def getTypeFactory: FlinkTypeFactory =
     super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+  def aggregate(
+      window: LogicalWindow,
+      groupKey: GroupKey,
+      namedProperties: Seq[NamedWindowProperty],
+      aggCalls: Iterable[AggCall])
+    : RelBuilder = {
+    // build logical aggregate
+    val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
+
+    // build logical window aggregate from it
+    push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
+    this
+  }
+
 }
 
 object FlinkRelBuilder {
@@ -69,4 +92,11 @@ object FlinkRelBuilder {
     new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
   }
 
+  /**
+    * Information necessary to create a window aggregate.
+    *
+    * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
+    */
+  case class NamedWindowProperty(name: String, property: WindowProperty)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 77eb907..1f607e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
 import org.apache.flink.api.table.plan.schema.GenericRelDataType
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
 
 import scala.collection.mutable
@@ -106,8 +106,8 @@ object FlinkTypeFactory {
       case SqlTimeTypeInfo.DATE => DATE
       case SqlTimeTypeInfo.TIME => TIME
       case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
-      case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
-      case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
+      case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
+      case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
 
       case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
         throw TableException("Character type is not supported.")
@@ -131,8 +131,8 @@ object FlinkTypeFactory {
     case DATE => SqlTimeTypeInfo.DATE
     case TIME => SqlTimeTypeInfo.TIME
     case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
-    case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MONTHS
-    case typeName if DAY_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MILLIS
+    case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
+    case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
 
     case NULL =>
       throw TableException("Type NULL is not supported. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index ac21834..b9e889d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -81,6 +81,47 @@ abstract class StreamTableEnvironment(
   protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
 
   /**
+    * Returns field names and field positions for a given [[TypeInformation]].
+    *
+    * Field names are automatically extracted for
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    * The method fails if inputType is not a
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    *
+    * @param inputType The TypeInformation extract the field names and positions from.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
+    : (Array[String], Array[Int]) = {
+    val fieldInfo = super.getFieldInfo(inputType)
+    if (fieldInfo._1.contains("rowtime")) {
+      throw new TableException("'rowtime' ia a reserved field name in stream environment.")
+    }
+    fieldInfo
+  }
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
+    * [[Expression]].
+    *
+    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
+    * @param exprs     The expressions that define the field names.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  override protected[flink] def getFieldInfo[A](
+      inputType: TypeInformation[A],
+      exprs: Array[Expression])
+    : (Array[String], Array[Int]) = {
+    val fieldInfo = super.getFieldInfo(inputType, exprs)
+    if (fieldInfo._1.contains("rowtime")) {
+      throw new TableException("'rowtime' is a reserved field name in stream environment.")
+    }
+    fieldInfo
+  }
+
+  /**
     * Ingests a registered table and returns the resulting [[Table]].
     *
     * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
@@ -226,11 +267,11 @@ abstract class StreamTableEnvironment(
     }
     catch {
       case e: CannotPlanException =>
-        throw new TableException(
+        throw TableException(
           s"Cannot generate a valid execution plan for the given query: \n\n" +
             s"${RelOptUtil.toString(relNode)}\n" +
             s"This exception indicates that the query uses an unsupported SQL feature.\n" +
-            s"Please check the documentation for the set of currently supported SQL features.")
+            s"Please check the documentation for the set of currently supported SQL features.", e)
     }
     dataStreamPlan
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index df97d2d..e8734f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -180,7 +180,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     // check that table belongs to this table environment
     if (table.tableEnv != this) {
-      throw new ValidationException(
+      throw new TableException(
         "Only tables that belong to this TableEnvironment can be registered.")
     }
 
@@ -230,7 +230,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     *
     * @param name The name under which the table is registered.
     * @param table The table to register in the catalog
-    * @throws ValidationException if another table is registered under the provided name.
+    * @throws TableException if another table is registered under the provided name.
     */
   @throws[TableException]
   protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
@@ -330,7 +330,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     val fieldIndexes = fieldNames.indices.toArray
 
     if (fieldNames.contains("*")) {
-      throw new ValidationException("Field name can not be '*'.")
+      throw new TableException("Field name can not be '*'.")
     }
 
     (fieldNames, fieldIndexes)
@@ -408,7 +408,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     val (fieldIndexes, fieldNames) = indexedNames.unzip
 
     if (fieldNames.contains("*")) {
-      throw new ValidationException("Field name can not be '*'.")
+      throw new TableException("Field name can not be '*'.")
     }
 
     (fieldNames.toArray, fieldIndexes.toArray)

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
index 64d4612..a988152 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
 
 /**
   * This class enumerates all supported types of the Table API.
@@ -39,7 +39,7 @@ object Types {
   val DATE = SqlTimeTypeInfo.DATE
   val TIME = SqlTimeTypeInfo.TIME
   val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
-  val INTERVAL_MONTHS = IntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = IntervalTypeInfo.INTERVAL_MILLIS
+  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
+  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 76f9b02..b78012c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, RowTypeInfo, TypeCheckUtils}
 
 object CodeGenUtils {
 
@@ -71,8 +71,8 @@ object CodeGenUtils {
     case SqlTimeTypeInfo.TIMESTAMP => "long"
 
     // internal primitive representation of time intervals
-    case IntervalTypeInfo.INTERVAL_MONTHS => "int"
-    case IntervalTypeInfo.INTERVAL_MILLIS => "long"
+    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "int"
+    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "long"
 
     case _ =>
       tpe.getTypeClass.getCanonicalName
@@ -106,8 +106,8 @@ object CodeGenUtils {
     case CHAR_TYPE_INFO => "'\\0'"
     case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1"
     case SqlTimeTypeInfo.TIMESTAMP => "-1L"
-    case IntervalTypeInfo.INTERVAL_MONTHS => "-1"
-    case IntervalTypeInfo.INTERVAL_MILLIS => "-1L"
+    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "-1"
+    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "-1L"
 
     case _ => "null"
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
index 094a224..bf76015 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.{CodeGenException, GeneratedExpression}
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 
 object ScalarOperators {
@@ -432,7 +432,7 @@ object ScalarOperators {
       }
 
     // Interval Months -> String
-    case (IntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
+    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
       val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
       val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
@@ -440,7 +440,7 @@ object ScalarOperators {
       }
 
     // Interval Millis -> String
-    case (IntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
+    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
       val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
       val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
@@ -573,17 +573,17 @@ object ScalarOperators {
          (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
          (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
          (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
-         (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) |
-         (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) |
-         (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
-         (IntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
+         (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+         (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
+         (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
+         (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
       internalExprCasting(operand, targetType)
 
     // internal reinterpretation of temporal types
     // Date, Time, Interval Months -> Long
     case  (SqlTimeTypeInfo.DATE, LONG_TYPE_INFO)
         | (SqlTimeTypeInfo.TIME, LONG_TYPE_INFO)
-        | (IntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
+        | (TimeIntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
       internalExprCasting(operand, targetType)
 
     case (from, to) =>
@@ -659,11 +659,11 @@ object ScalarOperators {
     val operator = if (plus) "+" else "-"
 
     (left.resultType, right.resultType) match {
-      case (l: IntervalTypeInfo[_], r: IntervalTypeInfo[_]) if l == r =>
+      case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
         generateArithmeticOperator(operator, nullCheck, l, left, right)
 
-      case (SqlTimeTypeInfo.DATE, IntervalTypeInfo.INTERVAL_MILLIS) |
-           (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
+      case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
+           (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
         generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
           if (isTimePoint(left.resultType)) {
             (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / ${MILLIS_PER_DAY}L))"
@@ -672,8 +672,8 @@ object ScalarOperators {
           }
         }
 
-      case (SqlTimeTypeInfo.TIME, IntervalTypeInfo.INTERVAL_MILLIS) |
-           (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
+      case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
+           (TimeIntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
         generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) {
           if (isTimePoint(left.resultType)) {
             (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))"
@@ -682,7 +682,7 @@ object ScalarOperators {
           }
         }
 
-      case (SqlTimeTypeInfo.TIMESTAMP, IntervalTypeInfo.INTERVAL_MILLIS) =>
+      case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
         generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
           (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
index 773e256..0a100dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
@@ -38,7 +38,18 @@ case class SqlParserException(
 /**
   * General Exception for all errors during table handling.
   */
-case class TableException(msg: String) extends RuntimeException(msg)
+case class TableException(
+    msg: String,
+    cause: Throwable)
+  extends RuntimeException(msg, cause) {
+
+  def this(msg: String) = this(msg, null)
+
+}
+
+object TableException {
+  def apply(msg: String): TableException = new TableException(msg)
+}
 
 /**
   * Exception for all errors occurring during validation phase.

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
index 9a824e8..c284bd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.tools.RelBuilder
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.trees.TreeNode
-import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationSuccess}
+import org.apache.flink.api.table.validate.{ValidationResult, ValidationSuccess}
 
 abstract class Expression extends TreeNode[Expression] {
   /**
@@ -44,7 +44,7 @@ abstract class Expression extends TreeNode[Expression] {
     * or `ValidationFailure` with supplement message explaining the error.
     * Note: we should only call this method until `childrenValid == true`
     */
-  private[flink] def validateInput(): ExprValidationResult = ValidationSuccess
+  private[flink] def validateInput(): ValidationResult = ValidationSuccess
 
   /**
     * Convert Expression to its counterpart in Calcite, i.e. RexNode

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 424d86c..a438c1c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval,
 import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.api.table.expressions.TrimMode.TrimMode
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
 
 import scala.language.implicitConversions
 import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
@@ -54,6 +54,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val MIN: Keyword = Keyword("min")
   lazy val MAX: Keyword = Keyword("max")
   lazy val SUM: Keyword = Keyword("sum")
+  lazy val START: Keyword = Keyword("start")
+  lazy val END: Keyword = Keyword("end")
   lazy val CAST: Keyword = Keyword("cast")
   lazy val NULL: Keyword = Keyword("Null")
   lazy val IF: Keyword = Keyword("?")
@@ -80,11 +82,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val SECOND: Keyword = Keyword("second")
   lazy val MILLIS: Keyword = Keyword("millis")
   lazy val MILLI: Keyword = Keyword("milli")
+  lazy val ROWS: Keyword = Keyword("rows")
   lazy val STAR: Keyword = Keyword("*")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
-      not(SUM) ~ not(CAST) ~ not(NULL) ~
+      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
       not(IF) ~> super.ident
 
   // symbols
@@ -107,10 +110,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
       "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
       "INTERVAL_MONTHS" ^^ {
-        ti => IntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
+        ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
       } |
       "INTERVAL_MILLIS" ^^ {
-        ti => IntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
+        ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
       } |
       "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
       "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
@@ -187,6 +190,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixAvg: PackratParser[Expression] =
     composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
 
+  lazy val suffixStart: PackratParser[Expression] =
+    composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
+
+  lazy val suffixEnd: PackratParser[Expression] =
+    composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
+
   lazy val suffixCast: PackratParser[Expression] =
     composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
     case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
@@ -265,11 +274,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
   }
 
+  lazy val suffixRowInterval : PackratParser[Expression] =
+    composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
+
   lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixSum | suffixMin | suffixMax |
-      suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
-      suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
-      suffixExtract | suffixFloor | suffixCeil |
+    suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
+      suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
+      suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
+      suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
       suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
 
   // prefix operators
@@ -289,6 +301,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val prefixAvg: PackratParser[Expression] =
     AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
 
+  lazy val prefixStart: PackratParser[Expression] =
+    START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
+
+  lazy val prefixEnd: PackratParser[Expression] =
+    END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
+
   lazy val prefixCast: PackratParser[Expression] =
     CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
     case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
@@ -333,7 +351,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val prefixed: PackratParser[Expression] =
-    prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
+    prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd |
       prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
       prefixFloor | prefixCeil |
       prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
index 6ac9f58..c071c59 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
@@ -25,22 +25,33 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 
 object ExpressionUtils {
 
   private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
     case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
-      Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MONTHS)
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
     case _ =>
-      Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MONTHS)
+      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MONTHS)
   }
 
   private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match {
     case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
-      Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MILLIS)
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
     case _ =>
-      Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MILLIS)
+      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MILLIS)
+  }
+
+  private[flink] def toRowInterval(expr: Expression): Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value.toLong, RowIntervalTypeInfo.INTERVAL_ROWS)
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+      Literal(value, RowIntervalTypeInfo.INTERVAL_ROWS)
+    case _ =>
+      throw new IllegalArgumentException("Invalid value for row interval literal.")
   }
 
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
index 7df46c2..f545d13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
@@ -38,7 +38,7 @@ trait InputTypeSpec extends Expression {
     */
   private[flink] def expectedTypes: Seq[TypeInformation[_]]
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     val typeMismatches = mutable.ArrayBuffer.empty[String]
     children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
       if (e.resultType != tpe) {

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index 8cc56d8..259f7e5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -21,7 +21,6 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils
 
@@ -47,7 +46,7 @@ case class Sum(child: Expression) extends Aggregation {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput =
+  override private[flink] def validateInput() =
     TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
 }
 
@@ -60,7 +59,7 @@ case class Min(child: Expression) extends Aggregation {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput =
+  override private[flink] def validateInput() =
     TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
 }
 
@@ -73,7 +72,7 @@ case class Max(child: Expression) extends Aggregation {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput =
+  override private[flink] def validateInput() =
     TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
 }
 
@@ -96,6 +95,6 @@ case class Avg(child: Expression) extends Aggregation {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput =
+  override private[flink] def validateInput() =
     TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index 4a7978a..cf4f82a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -44,7 +44,7 @@ abstract class BinaryArithmetic extends BinaryExpression {
     }
 
   // TODO: tighten this rule once we implemented type coercion rules during validation
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
       ValidationFailure(s"$this requires both operands Numeric, get " +
         s"$left : ${left.resultType} and $right : ${right.resultType}")
@@ -81,7 +81,7 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
     }
   }
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (isString(left.resultType) || isString(right.resultType)) {
       ValidationSuccess
     } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
@@ -110,7 +110,7 @@ case class UnaryMinus(child: Expression) extends UnaryExpression {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (isNumeric(child.resultType)) {
       ValidationSuccess
     } else if (isTimeInterval(child.resultType)) {
@@ -126,7 +126,7 @@ case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
 
   private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
       ValidationSuccess
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 2c64136..39367be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -21,7 +21,7 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{getResultType, getSignature, signatureToString, signaturesToString}
-import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess}
+import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
 import org.apache.flink.api.table.{FlinkTypeFactory, UnresolvedException}
 
 /**
@@ -41,7 +41,7 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
   override private[flink] def resultType =
     throw UnresolvedException(s"calling resultType on UnresolvedFunction $functionName")
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     ValidationFailure(s"Unresolved function call: $functionName")
 }
 
@@ -71,7 +71,7 @@ case class ScalarFunctionCall(
 
   override private[flink] def resultType = getResultType(scalarFunction, foundSignature.get)
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     val signature = children.map(_.resultType)
     // look for a signature that matches the input types
     foundSignature = getSignature(scalarFunction, signature)

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index f65dd5b..2232a91 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -43,7 +43,7 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary
     copy(child, resultType).asInstanceOf[this.type]
   }
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (TypeCoercion.canCast(child.resultType, resultType)) {
       ValidationSuccess
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index babb677..d5244d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -37,7 +37,7 @@ abstract class BinaryComparison extends BinaryExpression {
   override private[flink] def resultType = BOOLEAN_TYPE_INFO
 
   // TODO: tighten this rule once we implemented type coercion rules during validation
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
@@ -53,7 +53,7 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
 
   private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       // TODO widen this rule once we support custom objects as types (FLINK-3916)
@@ -68,7 +68,7 @@ case class NotEqualTo(left: Expression, right: Expression) extends BinaryCompari
 
   private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       // TODO widen this rule once we support custom objects as types (FLINK-3916)

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index 5f20751..91efd08 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -19,11 +19,9 @@ package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.UnresolvedException
-import org.apache.flink.api.table.validate.{ValidationSuccess, ExprValidationResult,
-ValidationFailure}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.{UnresolvedException, ValidationException}
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 trait NamedExpression extends Expression {
   private[flink] def name: String
@@ -44,10 +42,10 @@ case class UnresolvedFieldReference(name: String) extends Attribute {
     UnresolvedFieldReference(newName)
 
   override private[flink] def resultType: TypeInformation[_] =
-    throw new UnresolvedException(s"calling resultType on ${this.getClass}")
+    throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
 
-  override private[flink] def validateInput(): ExprValidationResult =
-    ValidationFailure(s"Unresolved reference $name")
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved reference $name.")
 }
 
 case class ResolvedFieldReference(
@@ -93,7 +91,7 @@ case class Alias(child: Expression, name: String)
     }
   }
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (name == "*") {
       ValidationFailure("Alias can not accept '*' as name.")
     } else {
@@ -115,3 +113,38 @@ case class UnresolvedAlias(child: Expression) extends UnaryExpression with Named
 
   override private[flink] lazy val valid = false
 }
+
+case class RowtimeAttribute() extends Attribute {
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == "rowtime") {
+      this
+    } else {
+      throw new ValidationException("Cannot rename streaming rowtime attribute.")
+    }
+  }
+
+  override private[flink] def name: String = "rowtime"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+case class WindowReference(name: String) extends Attribute {
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("A window reference can not be used solely.")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw new UnsupportedOperationException("A window reference has no result type.")
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      throw new ValidationException("Cannot rename window reference.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index 677160a..6382abe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 
 object Literal {
   private[flink] def apply(l: Any): Literal = l match {
@@ -50,7 +50,16 @@ object Literal {
 }
 
 case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
-  override def toString = s"$value"
+  override def toString = resultType match {
+    case _: BasicTypeInfo[_] => value.toString
+    case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
+    case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
+    case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
+    case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
+    case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
+    case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
+    case _ => s"Literal($value, $resultType)"
+  }
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     resultType match {
@@ -67,7 +76,7 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
       case SqlTimeTypeInfo.TIMESTAMP =>
         relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
 
-      case IntervalTypeInfo.INTERVAL_MONTHS =>
+      case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
         val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
         val intervalQualifier = new SqlIntervalQualifier(
           TimeUnit.YEAR,
@@ -75,7 +84,7 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
           SqlParserPos.ZERO)
         relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
 
-      case IntervalTypeInfo.INTERVAL_MILLIS =>
+      case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
         val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
         val intervalQualifier = new SqlIntervalQualifier(
           TimeUnit.DAY,

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
index bf4dc85..9c8e279 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.table.validate._
 abstract class BinaryPredicate extends BinaryExpression {
   override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
         right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
       ValidationSuccess
@@ -48,7 +48,7 @@ case class Not(child: Expression) extends UnaryExpression {
 
   override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
       ValidationSuccess
     } else {
@@ -94,7 +94,7 @@ case class If(
     relBuilder.call(SqlStdOperatorTable.CASE, c, t, f)
   }
 
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
         ifTrue.resultType == ifFalse.resultType) {
       ValidationSuccess

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
index 80adf5f..e0f4691 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.table.validate._
 case class Abs(child: Expression) extends UnaryExpression {
   override private[flink] def resultType: TypeInformation[_] = child.resultType
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     TypeCheckUtils.assertNumericExpr(child.resultType, "Abs")
 
   override def toString: String = s"abs($child)"
@@ -42,7 +42,7 @@ case class Abs(child: Expression) extends UnaryExpression {
 case class Ceil(child: Expression) extends UnaryExpression {
   override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil")
 
   override def toString: String = s"ceil($child)"
@@ -68,7 +68,7 @@ case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec {
 case class Floor(child: Expression) extends UnaryExpression {
   override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
 
-  override private[flink] def validateInput(): ExprValidationResult =
+  override private[flink] def validateInput(): ValidationResult =
     TypeCheckUtils.assertNumericExpr(child.resultType, "Floor")
 
   override def toString: String = s"floor($child)"

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
index 85da69c..c15d462 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.validate._
 
 abstract class Ordering extends UnaryExpression {
-  override private[flink] def validateInput(): ExprValidationResult = {
+  override private[flink] def validateInput(): ValidationResult = {
     if (!child.isInstanceOf[NamedExpression]) {
       ValidationFailure(s"Sort should only based on field reference")
     } else {