You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/15 09:44:10 UTC

[09/10] flink git commit: [FLINK-6748] [table] [docs] Reworked Table API Page

[FLINK-6748] [table] [docs] Reworked Table API Page

This closes #4093.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23248157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23248157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23248157

Branch: refs/heads/master
Commit: 232481572bb48e82880afdb2f7237af08a8404b5
Parents: ddae51f
Author: twalthr <tw...@apache.org>
Authored: Fri Jun 9 06:54:22 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 15 11:42:19 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/index.md     |  82 ++++++
 docs/dev/table/tableApi.md  | 534 +++++++++++++++++++++++++++++----------
 docs/dev/tableApi.md        |  82 ------
 docs/redirects/table.md     |   2 +-
 docs/redirects/table_api.md |  24 ++
 5 files changed, 504 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23248157/docs/dev/table/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/index.md b/docs/dev/table/index.md
new file mode 100644
index 0000000..df2ccba
--- /dev/null
+++ b/docs/dev/table/index.md
@@ -0,0 +1,82 @@
+---
+title: "Table API & SQL"
+nav-id: tableapi
+nav-parent_id: dev
+is_beta: true
+nav-show_overview: true
+nav-pos: 35
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. Flink's SQL support is based on [Apache Calcite](https://calcite.apache.org) which implements the SQL standard. Queries specified in either interface have the same semantics and specify the same result regardless whether the input is a batch input (DataSet) or a stream input (DataStream).
+
+The Table API and the SQL interfaces are tightly integrated with each other as well as Flink's DataStream and DataSet APIs. You can easily switch between all APIs and libraries which build upon the APIs. For instance, you can extract patterns from a DataStream using the [CEP library]({{ site.baseurl }}/dev/libs/cep.html) and later use the Table API to analyze the patterns, or you might scan, filter, and aggregate a batch table using a SQL query before running a [Gelly graph algorithm]({{ site.baseurl }}/dev/libs/gelly) on the preprocessed data.
+
+**Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of \[Table API, SQL\] and \[stream, batch\] input.**
+
+Setup
+-----
+
+The Table API and SQL are bundled in the `flink-table` Maven artifact. 
+The following dependency must be added to your project in order to use the Table API and SQL:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+In addition, you need to add a dependency for either Flink's Scala batch or streaming API. For a batch query you need to add:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+For a streaming query you need to add:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+**Note:** Due to an issue in Apache Calcite, which prevents the user classloaders from being garbage-collected, we do *not* recommend building a fat-jar that includes the `flink-table` dependency. Instead, we recommend configuring Flink to include the `flink-table` dependency in the system classloader. This can be done by copying the `flink-table.jar` file from the `./opt` folder to the `./lib` folder. See [these instructions]({{ site.baseurl }}/dev/linking.html) for further details.
+
+{% top %}
+
+Where to go next?
+-----------------
+
+* [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared concepts and APIs of the Table API and SQL.
+* [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results.
+* [Table API]({{ site.baseurl }}/dev/table/tableapi.html): Supported operations and API for the Table API.
+* [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and syntax for SQL
+* [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): Reading tables from and emitting tables to external storage systems.
+* [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition and usage of user-defined functions.
+
+{% top %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/23248157/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 25810d2..693b9bd 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -22,94 +22,129 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-The Table API is a language-integrated relational API for Scala and Java. The Table API is a unified API for stream and batch processing. 
+The Table API is a unified, relational API for stream and batch processing. Table API queries can be run on batch or streaming input without modifications. The Table API is a super set of the SQL language and is specially designed for working with Apache Flink. The Table API is a language-integrated API for Scala and Java. Instead of specifying queries as String values as common with SQL, Table API queries are defined in a language-embedded style in Java or Scala with IDE support like autocompletion and syntax validation. 
 
-Please have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) and the [Streaming Concepts]({{ site.baseurl }}/dev/table/streaming.html) if you work with streaming data.
+The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts]({{ site.baseurl }}/dev/table/streaming.html) page discusses streaming specific concepts such as dynamic tables and time attributes.
 
-The following examples assume a registered table called `Orders` with attributes `a, b, c, rowtime`.
-
-**TODO: Extend**
+The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](streaming.html#time-attributes) in streaming or a regular timestamp field in batch.
 
 * This will be replaced by the TOC
 {:toc}
 
-Table API Overview
-------------------
-
-The Table API is available for Scala and Java. The Scala Table API is based on Scala Expressions, the Java Table API on Strings which are parsed and converted into Expressions.
+Overview & Examples
+-----------------------------
 
-The following example shows the differences between the Scala and Java Table API. 
+The Table API is available for Scala and Java. The Scala Table API leverages on Scala expressions, the Java Table API is based on strings which are parsed and converted into equivalent expressions.
 
-**TODO: Extend**
+The following example shows the differences between the Scala and Java Table API. The table program is executed in a batch environment. It scans the `Orders` table, groups by field `a`, and counts the resulting rows per group. The result of the table program is converted into a `DataSet` of type `Row` and printed.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed.
+The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
-
+// environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
+// register Orders table in table environment
+// ...
+
+// specify table program
 Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
 
 Table counts = orders
         .groupBy("a")
         .select("a, b.count as cnt");
 
-DataSet<Row> result = tableEnv.toDataSet(wordCounts, Row.class);
-{% endhighlight %}
-
-With Java, expressions must be specified by Strings. The embedded expression DSL is not supported.
-
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-// register the DataSet cust as table "Customers" with fields derived from the dataset
-tableEnv.registerDataSet("Customers", cust)
-
-// register the DataSet ord as table "Orders" with fields user, product, and amount
-tableEnv.registerDataSet("Orders", ord, "user, product, amount");
+// conversion to DataSet
+DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
+result.print();
 {% endhighlight %}
 
 </div>
 
 <div data-lang="scala" markdown="1">
 
-The Scala Table API is enabled by importing `org.apache.flink.table.api.scala._`. The following example shows how a Scala Table API program is constructed.
+The Scala Table API is enabled by importing `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._`.
+
+The following example shows how a Scala Table API program is constructed. Table attributes are referenced using [Scala Symbols](http://scala-lang.org/files/archive/spec/2.12/01-lexical-syntax.html#symbol-literals), which start with an apostrophe character (`'`).
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 
+// environment configuration
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env)
 
+// register Orders table in table environment
+// ...
+
+// specify table program
 val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
+
 val result = orders
                .groupBy('a)
                .select('a, 'b.count as 'cnt)
-               .toDataSet[Row]
+               .toDataSet[Row] // conversion to DataSet
+               .print()
 {% endhighlight %}
 
-The expression DSL uses Scala symbols to refer to field names and code generation to
-transform expressions to efficient runtime code. Please note that the conversion to and from
-Tables only works when using Scala case classes or Java POJOs. Please refer to the [Type Extraction and Serialization]({{ site.baseurl }}/internals/types_serialization.html) section
-to learn the characteristics of a valid POJO.
+</div>
+</div>
+
+The next example shows a more complex Table API program. The program scans again the `Orders` table. It filters null values, normalizes the field `a` of type String, and calculates for each hour and product `a` the average billing amount `b`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+// environment configuration
+// ...
+
+// specify table program
+Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
+
+Table result = orders
+        .filter("a.isNotNull && b.isNotNull && c.isNotNull")
+        .select("a.lowerCase(), b, rowtime")
+        .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
+        .groupBy("hourlyWindow, a")
+        .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+// environment configuration
+// ...
+
+// specify table program
+val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
+
+val result: Table = orders
+        .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
+        .select('a.lowerCase(), 'b, 'rowtime)
+        .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
+        .groupBy('hourlyWindow, 'a)
+        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount);
+{% endhighlight %}
 
 </div>
 </div>
 
-**TODO**
+Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming.html) for details).
 
 {% top %}
 
 Operations
 ----------
 
-**TODO: Add Tags for Batch and Streaming support**
+The Table API supports the following operations. Please note that not all operations are available in both batch and streaming yet; they are tagged accordingly.
 
 ### Scan, Projection, and Filter
 
@@ -125,7 +160,10 @@ Operations
   </thead>
   <tbody>
   	<tr>
-  		<td><strong>Scan</strong></td>
+  		<td>
+        <strong>Scan</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
   		<td>
         <p>Similar to the FROM clause in a SQL query. Performs a scan of a registered table.</p>
 {% highlight java %}
@@ -134,7 +172,10 @@ Table orders = tableEnv.scan("Orders");
       </td>
   	</tr>
     <tr>
-      <td><strong>Select</strong></td>
+      <td>
+        <strong>Select</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Similar to a SQL SELECT statement. Performs a select operation.</p>
 {% highlight java %}
@@ -149,7 +190,10 @@ Table result = orders.select("*");
     </tr>
 
     <tr>
-      <td><strong>As</strong></td>
+      <td>
+        <strong>As</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Renames fields.</p>
 {% highlight java %}
@@ -160,17 +204,20 @@ Table result = orders.as("x, y, z, t");
     </tr>
 
     <tr>
-      <td><strong>Where / Filter</strong></td>
+      <td>
+        <strong>Where / Filter</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
-Table result = orders.where("b = 'red'");
+Table result = orders.where("b === 'red'");
 {% endhighlight %}
 or
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
-Table result = orders.filter("a % 2 = 0");
+Table result = orders.filter("a % 2 === 0");
 {% endhighlight %}
       </td>
     </tr>
@@ -189,7 +236,10 @@ Table result = orders.filter("a % 2 = 0");
   </thead>
   <tbody>
   	<tr>
-  		<td><strong>Scan</strong></td>
+  		<td>
+        <strong>Scan</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
   		<td>
         <p>Similar to the FROM clause in a SQL query. Performs a scan of a registered table.</p>
 {% highlight scala %}
@@ -198,7 +248,10 @@ val orders: Table = tableEnv.scan("Orders")
       </td>
   	</tr>
   	<tr>
-      <td><strong>Select</strong></td>
+      <td>
+        <strong>Select</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Similar to a SQL SELECT statement. Performs a select operation.</p>
 {% highlight scala %}
@@ -214,17 +267,23 @@ val result = orders.select('*)
     </tr>
 
     <tr>
-      <td><strong>As</strong></td>
+      <td>
+        <strong>As</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Renames fields.</p>
 {% highlight scala %}
-val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't')
+val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)
 {% endhighlight %}
       </td>
     </tr>
 
     <tr>
-      <td><strong>Where / Filter</strong></td>
+      <td>
+        <strong>Where / Filter</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
 {% highlight scala %}
@@ -259,26 +318,63 @@ val result = orders.where('b === "red")
   </thead>
   <tbody>
     <tr>
-      <td><strong>GroupBy</strong></td>
       <td>
-        <p>Similar to a SQL GROUPBY clause. Groups the rows on the grouping keys, with a following aggregation
-        operator to aggregate rows group-wise.</p>
+        <strong>GroupBy Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span><br>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.</p>
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.groupBy("a").select("a, b.sum as d");
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
     <tr>
-    	<td><strong>GroupBy Window</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>GroupBy Window Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Groups and aggregates a table on a <a href="#group-windows">group window</a> and possibly one or more grouping keys.</p>
+{% highlight java %}
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+    .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
+    .groupBy("a, w") // group by key and window
+    .select("a, w.start, w.end, b.sum as d"); // access window properties and aggregate
+{% endhighlight %}
+      </td>
     </tr>
     <tr>
-    	<td><strong>Over Window</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>Over Window Aggregation</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+       <p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p>
+       {% highlight scala %}
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+    // define window
+    .window(Over  
+      .partitionBy("a")
+      .orderBy("rowtime")
+      .preceding("UNBOUNDED_RANGE")
+      .following("CURRENT_RANGE")
+      .as("w")
+    .select("a, b.avg over w, b.max over w, b.min over w") // sliding aggregate
+{% endhighlight %}
+       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
+      </td>
     </tr>
     <tr>
-      <td><strong>Distinct</strong></td>
+      <td>
+        <strong>Distinct</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
 {% highlight java %}
@@ -303,26 +399,63 @@ Table result = orders.distinct();
   <tbody>
 
     <tr>
-      <td><strong>GroupBy</strong></td>
       <td>
-        <p>Similar to a SQL GROUPBY clause. Groups rows on the grouping keys, with a following aggregation
-        operator to aggregate rows group-wise.</p>
+        <strong>GroupBy Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span><br>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.</p>
 {% highlight scala %}
 val orders: Table = tableEnv.scan("Orders")
 val result = orders.groupBy('a).select('a, 'b.sum as 'd)
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
     <tr>
-    	<td><strong>GroupBy Window</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>GroupBy Window Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Groups and aggregates a table on a <a href="#group-windows">group window</a> and possibly one or more grouping keys.</p>
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+val result: Table = orders
+    .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
+    .groupBy('a, 'w) // group by key and window
+    .select('a, w.start, 'w.end, 'b.sum as 'd) // access window properties and aggregate
+{% endhighlight %}
+      </td>
     </tr>
     <tr>
-    	<td><strong>Over Window</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>Over Window Aggregation</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+       <p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p>
+       {% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+val result: Table = orders
+    // define window
+    .window(Over  
+      partitionBy 'a
+      orderBy 'rowtime
+      preceding UNBOUNDED_RANGE
+      following CURRENT_RANGE
+      as 'w)
+    .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w,) // sliding aggregate
+{% endhighlight %}
+       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a>.</p>
+      </td>
     </tr>
     <tr>
-      <td><strong>Distinct</strong></td>
+      <td>
+        <strong>Distinct</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
 {% highlight scala %}
@@ -352,7 +485,10 @@ val result = orders.distinct()
   </thead>
   <tbody>
   	<tr>
-      <td><strong>Inner Join</strong></td>
+      <td>
+        <strong>Inner Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
 {% highlight java %}
@@ -364,7 +500,10 @@ Table result = left.join(right).where("a = d").select("a, b, e");
     </tr>
 
     <tr>
-      <td><strong>LeftOuterJoin</strong></td>
+      <td>
+        <strong>Left Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight java %}
@@ -376,7 +515,10 @@ Table result = left.leftOuterJoin(right, "a = d").select("a, b, e");
     </tr>
 
     <tr>
-      <td><strong>RightOuterJoin</strong></td>
+      <td>
+        <strong>Right Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight java %}
@@ -388,7 +530,10 @@ Table result = left.rightOuterJoin(right, "a = d").select("a, b, e");
     </tr>
 
     <tr>
-      <td><strong>FullOuterJoin</strong></td>
+      <td>
+        <strong>Full Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight java %}
@@ -399,12 +544,46 @@ Table result = left.fullOuterJoin(right, "a = d").select("a, b, e");
       </td>
     </tr>
     <tr>
-    	<td><strong>TableFunction CrossJoin</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>TableFunction Join</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
+        </p>
+{% highlight java %}
+// register function
+TableFunction<String> split = new MySplitUDTF();
+tEnv.registerFunction("split", split);
+
+// join
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+    .join(new Table(tEnv, "split(c)").as("s", "t", "v")))
+    .select("a, b, s, t, v");
+{% endhighlight %}
+      </td>
     </tr>
     <tr>
-    	<td><strong>TableFunction LeftOuterJoin</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>TableFunction Left Outer Join</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
+        </p>
+{% highlight java %}
+// register function
+TableFunction<String> split = new MySplitUDTF();
+tEnv.registerFunction("split", split);
+
+// join
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+    .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v")))
+    .select("a, b, s, t, v");
+{% endhighlight %}
+      </td>
     </tr>
 
   </tbody>
@@ -423,7 +602,10 @@ Table result = left.fullOuterJoin(right, "a = d").select("a, b, e");
   <tbody>
 
   	<tr>
-      <td><strong>Join</strong></td>
+      <td>
+        <strong>Inner Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
 {% highlight scala %}
@@ -435,7 +617,10 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     </tr>
 
     <tr>
-      <td><strong>LeftOuterJoin</strong></td>
+      <td>
+        <strong>Left Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight scala %}
@@ -447,7 +632,10 @@ val result = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
     </tr>
 
     <tr>
-      <td><strong>RightOuterJoin</strong></td>
+      <td>
+        <strong>Right Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight scala %}
@@ -459,7 +647,10 @@ val result = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
     </tr>
 
     <tr>
-      <td><strong>FullOuterJoin</strong></td>
+      <td>
+        <strong>Full Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight scala %}
@@ -470,12 +661,41 @@ val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
       </td>
     </tr>
     <tr>
-    	<td><strong>TableFunction CrossJoin</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>TableFunction Join</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
+        </p>
+        {% highlight scala %}
+// instantiate function
+val split: TableFunction[_] = new MySplitUDTF()
+
+// join
+val result: Table = table
+    .join(split('c) as ('s, 't, 'v))
+    .select('a, 'b, 's, 't, 'v)
+{% endhighlight %}
+        </td>
     </tr>
     <tr>
-    	<td><strong>TableFunction LeftOuterJoin</strong></td>
-    	<td>TODO</td>
+    	<td>
+        <strong>TableFunction Left Outer Join</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+    	<td>
+        <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
+        </p>
+{% highlight scala %}
+// instantiate function
+val split: TableFunction[_] = new MySplitUDTF()
+
+// join
+val result: Table = table
+    .leftOuterJoin(split('c) as ('s, 't, 'v))
+    .select('a, 'b, 's, 't, 'v)
+{% endhighlight %}
+      </td>
     </tr>
 
   </tbody>
@@ -499,7 +719,10 @@ val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
   </thead>
   <tbody>
   	<tr>
-      <td><strong>Union</strong></td>
+      <td>
+        <strong>Union</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -511,7 +734,10 @@ Table result = left.union(right);
     </tr>
 
     <tr>
-      <td><strong>UnionAll</strong></td>
+      <td>
+        <strong>UnionAll</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
       <td>
         <p>Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -523,7 +749,10 @@ Table result = left.unionAll(right);
     </tr>
 
     <tr>
-      <td><strong>Intersect</strong></td>
+      <td>
+        <strong>Intersect</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -535,7 +764,10 @@ Table result = left.intersect(right);
     </tr>
 
     <tr>
-      <td><strong>IntersectAll</strong></td>
+      <td>
+        <strong>IntersectAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -547,7 +779,10 @@ Table result = left.intersectAll(right);
     </tr>
 
     <tr>
-      <td><strong>Minus</strong></td>
+      <td>
+        <strong>Minus</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -559,7 +794,10 @@ Table result = left.minus(right);
     </tr>
 
     <tr>
-      <td><strong>MinusAll</strong></td>
+      <td>
+        <strong>MinusAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
 {% highlight java %}
@@ -584,7 +822,10 @@ Table result = left.minusAll(right);
   </thead>
   <tbody>
   	<tr>
-      <td><strong>Union</strong></td>
+      <td>
+        <strong>Union</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.</p>
 {% highlight scala %}
@@ -596,7 +837,11 @@ val result = left.union(right);
     </tr>
 
     <tr>
-      <td><strong>UnionAll</strong></td>
+      <td>
+        <strong>UnionAll</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+
+      </td>
       <td>
         <p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.</p>
 {% highlight scala %}
@@ -608,7 +853,10 @@ val result = left.unionAll(right);
     </tr>
 
     <tr>
-      <td><strong>Intersect</strong></td>
+      <td>
+        <strong>Intersect</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p>
 {% highlight scala %}
@@ -620,7 +868,10 @@ val result = left.intersect(right);
     </tr>
 
     <tr>
-      <td><strong>IntersectAll</strong></td>
+      <td>
+        <strong>IntersectAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p>
 {% highlight scala %}
@@ -632,7 +883,10 @@ val result = left.intersectAll(right);
     </tr>
 
     <tr>
-      <td><strong>Minus</strong></td>
+      <td>
+        <strong>Minus</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
 {% highlight scala %}
@@ -644,7 +898,10 @@ val result = left.minus(right);
     </tr>
 
     <tr>
-      <td><strong>MinusAll</strong></td>
+      <td>
+        <strong>MinusAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
 {% highlight scala %}
@@ -676,7 +933,10 @@ val result = left.minusAll(right);
   </thead>
   <tbody>
   	<tr>
-      <td><strong>Order By</strong></td>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
 {% highlight scala %}
@@ -687,7 +947,10 @@ val result = in.orderBy('a.asc);
     </tr>
 
     <tr>
-      <td><strong>Limit</strong></td>
+      <td>
+        <strong>Limit</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
 {% highlight scala %}
@@ -717,7 +980,10 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
   </thead>
   <tbody>
   	<tr>
-      <td><strong>Order By</strong></td>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
 {% highlight java %}
@@ -728,7 +994,10 @@ Table result = in.orderBy("a.asc");
     </tr>
 
     <tr>
-      <td><strong>Limit</strong></td>
+      <td>
+        <strong>Limit</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
       <td>
         <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
 {% highlight java %}
@@ -748,11 +1017,9 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
 </div>
 </div>
 
-### Windows
+### Group Windows
 
-**TODO: Figure out where to put this stuff. I think it would be good to have it in the "Operations" section for a brief overview. A more detailed discussion of windows should go somewhere else, maybe into the "Common Concepts"?**
-
-The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
+Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
 
 Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. 
 The following example shows how to define a window aggregation on a table.
@@ -800,7 +1067,7 @@ val table = input
 </div>
 </div>
 
-The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.
+Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -822,6 +1089,8 @@ val table = input
 </div>
 </div>
 
+The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below.
+
 #### Tumble (Tumbling Windows)
 
 A tumbling window assigns rows to non-overlapping, continuous windows of fixed length. For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time, processing-time, or on a row-count.
@@ -832,7 +1101,6 @@ Tumbling windows are defined by using the `Tumble` class as follows:
   <thead>
     <tr>
       <th class="text-left" style="width: 20%">Method</th>
-      <th class="text-left" style="width: 20%">Required?</th>
       <th class="text-left">Description</th>
     </tr>
   </thead>
@@ -840,17 +1108,14 @@ Tumbling windows are defined by using the `Tumble` class as follows:
   <tbody>
     <tr>
       <td><code>over</code></td>
-      <td>Required.</td>
       <td>Defines the length the window, either as time or row-count interval.</td>
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>Required for streaming event-time windows and windows on batch tables.</td>
-      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped.</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Required.</td>
       <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
@@ -862,11 +1127,11 @@ Tumbling windows are defined by using the `Tumble` class as follows:
 // Tumbling Event-time Window
 .window(Tumble.over("10.minutes").on("rowtime").as("w"));
 
-// Tumbling Processing-time Window
-.window(Tumble.over("10.minutes").as("w"));
+// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.minutes").on("proctime").as("w"));
 
-// Tumbling Row-count Window
-.window(Tumble.over("10.rows").as("w"));
+// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.rows").on("proctime").as("w"));
 {% endhighlight %}
 </div>
 
@@ -875,11 +1140,11 @@ Tumbling windows are defined by using the `Tumble` class as follows:
 // Tumbling Event-time Window
 .window(Tumble over 10.minutes on 'rowtime as 'w)
 
-// Tumbling Processing-time Window
-.window(Tumble over 10.minutes as 'w)
+// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Tumble over 10.minutes on 'proctime as 'w)
 
-// Tumbling Row-count Window
-.window(Tumble over 10.rows as 'w)
+// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
+.window(Tumble over 10.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
 </div>
@@ -894,7 +1159,6 @@ Sliding windows are defined by using the `Slide` class as follows:
   <thead>
     <tr>
       <th class="text-left" style="width: 20%">Method</th>
-      <th class="text-left" style="width: 20%">Required?</th>
       <th class="text-left">Description</th>
     </tr>
   </thead>
@@ -902,22 +1166,18 @@ Sliding windows are defined by using the `Slide` class as follows:
   <tbody>
     <tr>
       <td><code>over</code></td>
-      <td>Required.</td>
       <td>Defines the length of the window, either as time or row-count interval.</td>
     </tr>
     <tr>
       <td><code>every</code></td>
-      <td>Required.</td>
       <td>Defines the slide interval, either as time or row-count interval. The slide interval must be of the same type as the size interval.</td>
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>Required for event-time windows and windows on batch tables.</td>
-      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Required.</td>
       <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
@@ -929,11 +1189,11 @@ Sliding windows are defined by using the `Slide` class as follows:
 // Sliding Event-time Window
 .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
 
-// Sliding Processing-time window
-.window(Slide.over("10.minutes").every("5.minutes").as("w"));
+// Sliding Processing-time window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
 
-// Sliding Row-count window
-.window(Slide.over("10.rows").every("5.rows").as("w"));
+// Sliding Row-count window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
 {% endhighlight %}
 </div>
 
@@ -942,11 +1202,11 @@ Sliding windows are defined by using the `Slide` class as follows:
 // Sliding Event-time Window
 .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
 
-// Sliding Processing-time window
-.window(Slide over 10.minutes every 5.minutes as 'w)
+// Sliding Processing-time window (assuming a processing-time attribute "proctime")
+.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
 
-// Sliding Row-count window
-.window(Slide over 10.rows every 5.rows as 'w)
+// Sliding Row-count window (assuming a processing-time attribute "proctime")
+.window(Slide over 10.rows every 5.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
 </div>
@@ -961,7 +1221,6 @@ A session window is defined by using the `Session` class as follows:
   <thead>
     <tr>
       <th class="text-left" style="width: 20%">Method</th>
-      <th class="text-left" style="width: 20%">Required?</th>
       <th class="text-left">Description</th>
     </tr>
   </thead>
@@ -969,17 +1228,14 @@ A session window is defined by using the `Session` class as follows:
   <tbody>
     <tr>
       <td><code>withGap</code></td>
-      <td>Required.</td>
       <td>Defines the gap between two windows as time interval.</td>
     </tr>
     <tr>
       <td><code>on</code></td>
-      <td>Required for event-time windows and windows on batch tables.</td>
-      <td>Defines the time mode for streaming tables (<code>rowtime</code> is a logical system attribute); for batch tables, the time attribute on which records are grouped</td>
+      <td>The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a <a href="streaming.html#time-attributes">declared event-time or processing-time time attribute</a>.</td>
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Required.</td>
       <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
@@ -991,8 +1247,8 @@ A session window is defined by using the `Session` class as follows:
 // Session Event-time Window
 .window(Session.withGap("10.minutes").on("rowtime").as("w"));
 
-// Session Processing-time Window
-.window(Session.withGap("10.minutes").as("w"));
+// Session Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Session.withGap("10.minutes").on("proctime").as("w"));
 {% endhighlight %}
 </div>
 
@@ -1001,14 +1257,18 @@ A session window is defined by using the `Session` class as follows:
 // Session Event-time Window
 .window(Session withGap 10.minutes on 'rowtime as 'w)
 
-// Session Processing-time Window
-.window(Session withGap 10.minutes as 'w)
+// Session Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Session withGap 10.minutes on 'proctime as 'w)
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
+### Over Windows
+
+**TO BE DONE**
+
 Data Types
 ----------
 
@@ -3546,14 +3806,14 @@ ANY.as(name [, name ]* )
 
 </div>
 
-### Limitations
+### Unsupported Functions
 
 The following operations are not supported yet:
 
 - Binary string operators and functions
 - System functions
 - Collection functions
-- Aggregate functions like STDDEV_xxx, VAR_xxx, and REGR_xxx
+- Aggregate functions like REGR_xxx
 - Distinct aggregate functions like COUNT DISTINCT
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/23248157/docs/dev/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/tableApi.md b/docs/dev/tableApi.md
deleted file mode 100644
index f7b13f0..0000000
--- a/docs/dev/tableApi.md
+++ /dev/null
@@ -1,82 +0,0 @@
----
-title: "Table API & SQL"
-nav-id: tableapi
-nav-parent_id: dev
-is_beta: true
-nav-show_overview: true
-nav-pos: 35
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. Flink's SQL support is based on [Apache Calcite](https://calcite.apache.org) which implements the SQL standard. Queries specified in either interface have the same semantics and specify the same result regardless whether the input is a batch input (DataSet) or a stream input (DataStream).
-
-The Table API and the SQL interfaces are tightly integrated with each other as well as Flink's DataStream and DataSet APIs. You can easily switch between all APIs and libraries which build upon the APIs. For instance, you can extract patterns from a DataStream using the [CEP library]({{ site.baseurl }}/dev/libs/cep.html) and later use the Table API to analyze the patterns, or you might scan, filter, and aggregate a batch table using a SQL query before running a [Gelly graph algorithm]({{ site.baseurl }}/dev/libs/gelly) on the preprocessed data.
-
-**Please note that the Table API and SQL are not yet feature complete and are being active developed. Not all operations are supported by every combination of \[Table API, SQL\] and \[stream, batch\] input.**
-
-Setup
------
-
-The Table API and SQL are bundled in the `flink-table` Maven artifact. 
-The following dependency must be added to your project in order to use the Table API and SQL:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-table{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-In addition, you need to add a dependency for either Flink's Scala batch or streaming API. For a batch query you need to add:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-scala{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-For a streaming query you need to add:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-scala{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-**Note:** Due to an issue in Apache Calcite, which prevents the user classloaders from being garbage-collected, we do *not* recommend building a fat-jar that includes the `flink-table` dependency. Instead, we recommend configuring Flink to include the `flink-table` dependency in the system classloader. This can be done by copying the `flink-table.jar` file from the `./opt` folder to the `./lib` folder. See [these instructions]({{ site.baseurl }}/dev/linking.html) for further details.
-
-{% top %}
-
-Where to go next?
------------------
-
-* [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared concepts and APIs of the Table API and SQL.
-* [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results.
-* [Table API]({{ site.baseurl }}/dev/table/tableapi.html): Supported operations and API for the Table API.
-* [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and syntax for SQL
-* [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): Reading tables from and emitting tables to external storage systems.
-* [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition and usage of user-defined functions.
-
-{% top %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/23248157/docs/redirects/table.md
----------------------------------------------------------------------
diff --git a/docs/redirects/table.md b/docs/redirects/table.md
index 0cc9a32..0284608 100644
--- a/docs/redirects/table.md
+++ b/docs/redirects/table.md
@@ -1,7 +1,7 @@
 ---
 title: "Table API"
 layout: redirect
-redirect: /dev/table_api.html
+redirect: /dev/table/index.html
 permalink: /apis/table.html
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/23248157/docs/redirects/table_api.md
----------------------------------------------------------------------
diff --git a/docs/redirects/table_api.md b/docs/redirects/table_api.md
new file mode 100644
index 0000000..c1afaec
--- /dev/null
+++ b/docs/redirects/table_api.md
@@ -0,0 +1,24 @@
+---
+title: "Table API"
+layout: redirect
+redirect: /dev/table/index.html
+permalink: /dev/table_api.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->