You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:55 UTC

[10/12] flink git commit: [FLINK-3738] [table] Refactor TableEnvironments. Remove Translators and TranslationContext.

[FLINK-3738] [table] Refactor TableEnvironments. Remove Translators and TranslationContext.

- Removes Translators (JavaBatchTranslator, ScalaBatchTranslator, JavaStreamTranslator, ScalaStreamTranslator)
- Removes TranslationContext
- Creates TableEnvironments for Java/Scala DataSet/DataStream APIs
- Functionality of translators and translation context is moved to table environments
- Tables are bound to a TableEnvironment
- Simplifies table translation from and to DataSet / DataStream.
- Updates documentation and cleaned up ScalaDocs

This closes #1887


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1acd8445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1acd8445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1acd8445

Branch: refs/heads/master
Commit: 1acd844539d3e0ea19d492d2f5f89e32f29827d8
Parents: 760a0d9
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Apr 12 18:48:38 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 15 19:38:15 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/libs/table.md                   | 266 ++++++------
 .../flink/api/java/table/package-info.java      |  23 +-
 .../flink/examples/java/JavaTableExample.java   |   5 +-
 .../api/java/table/BatchTableEnvironment.scala  | 165 ++++++++
 .../api/java/table/JavaBatchTranslator.scala    | 116 ------
 .../api/java/table/JavaStreamTranslator.scala   |  99 -----
 .../api/java/table/StreamTableEnvironment.scala | 123 ++++++
 .../flink/api/java/table/TableEnvironment.scala | 160 -------
 .../api/scala/table/BatchTableEnvironment.scala | 142 +++++++
 .../api/scala/table/DataSetConversions.scala    |  78 ++--
 .../api/scala/table/DataStreamConversions.scala |  72 ++--
 .../api/scala/table/ScalaBatchTranslator.scala  |  54 ---
 .../api/scala/table/ScalaStreamTranslator.scala |  51 ---
 .../scala/table/StreamTableEnvironment.scala    | 102 +++++
 .../api/scala/table/TableConversions.scala      |  50 +--
 .../api/scala/table/TableEnvironment.scala      | 126 ------
 .../apache/flink/api/scala/table/package.scala  | 108 +++--
 .../api/table/AbstractTableEnvironment.scala    | 101 -----
 .../flink/api/table/BatchTableEnvironment.scala | 236 +++++++++++
 .../api/table/StreamTableEnvironment.scala      | 196 +++++++++
 .../flink/api/table/TableEnvironment.scala      | 358 ++++++++++++++++
 .../flink/api/table/plan/PlanTranslator.scala   |  80 ----
 .../api/table/plan/RexNodeTranslator.scala      |  18 +-
 .../api/table/plan/TranslationContext.scala     | 219 ----------
 .../org/apache/flink/api/table/table.scala      | 152 +++----
 .../flink/examples/scala/TPCHQuery3Table.scala  |   8 +-
 .../flink/examples/scala/WordCountTable.scala   |   4 +-
 .../api/java/table/test/AggregationsITCase.java |  20 +-
 .../flink/api/java/table/test/AsITCase.java     | 416 ------------------
 .../api/java/table/test/CastingITCase.java      |  16 +-
 .../api/java/table/test/DistinctITCase.java     |   7 +-
 .../api/java/table/test/ExpressionsITCase.java  |   9 +-
 .../flink/api/java/table/test/FilterITCase.java |  18 +-
 .../api/java/table/test/FromDataSetITCase.java  | 417 +++++++++++++++++++
 .../table/test/GroupedAggregationsITCase.java   |  13 +-
 .../flink/api/java/table/test/JoinITCase.java   |  35 +-
 .../api/java/table/test/PojoGroupingITCase.java |   5 +-
 .../java/table/test/RegisterDataSetITCase.java  | 142 -------
 .../flink/api/java/table/test/SelectITCase.java |  13 +-
 .../api/java/table/test/SqlExplainITCase.java   | 162 -------
 .../api/java/table/test/SqlExplainTest.java     | 154 +++++++
 .../table/test/StringExpressionsITCase.java     |  21 +-
 .../java/table/test/TableEnvironmentITCase.java | 151 +++++++
 .../flink/api/java/table/test/UnionITCase.java  |  36 +-
 .../api/scala/sql/test/AggregationsITCase.scala |  60 ++-
 .../flink/api/scala/sql/test/FilterITCase.scala |  45 +-
 .../flink/api/scala/sql/test/JoinITCase.scala   | 120 +++---
 .../flink/api/scala/sql/test/SelectITCase.scala |  34 +-
 .../api/scala/sql/test/TableWithSQLITCase.scala |  22 +-
 .../flink/api/scala/sql/test/UnionITCase.scala  |  23 +-
 .../table/streaming/test/FilterITCase.scala     |  22 +-
 .../table/streaming/test/SelectITCase.scala     |  29 +-
 .../table/streaming/test/UnionITCase.scala      |  40 +-
 .../scala/table/test/AggregationsITCase.scala   |  46 +-
 .../flink/api/scala/table/test/AsITCase.scala   | 151 -------
 .../flink/api/scala/table/test/CalcITCase.scala |  31 +-
 .../api/scala/table/test/CastingITCase.scala    |  30 +-
 .../api/scala/table/test/DistinctITCase.scala   |   8 +-
 .../scala/table/test/ExpressionsITCase.scala    |  44 +-
 .../api/scala/table/test/FilterITCase.scala     |  54 ++-
 .../table/test/GroupedAggregationsITCase.scala  |  42 +-
 .../flink/api/scala/table/test/JoinITCase.scala | 105 +++--
 .../table/test/RegisterDataSetITCase.scala      | 136 ------
 .../api/scala/table/test/SelectITCase.scala     |  45 +-
 .../api/scala/table/test/SqlExplainTest.scala   |  59 ++-
 .../table/test/StringExpressionsITCase.scala    |  20 +-
 .../table/test/TableEnvironmentITCase.scala     | 143 +++++++
 .../api/scala/table/test/ToTableITCase.scala    | 159 +++++++
 .../api/scala/table/test/UnionITCase.scala      |  74 +++-
 .../table/test/utils/ExpressionEvaluator.scala  |  31 +-
 .../test/utils/TableProgramsTestBase.scala      |  34 +-
 71 files changed, 3436 insertions(+), 2918 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index 56e2b6b..5b1280c 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -31,13 +31,11 @@ under the License.
 -->
 
 
-**The Table API: an experimental feature**
+**Table API and SQL are experimental features**
 
-Flink's Table API is a SQL-like expression language embedded in Java and Scala.
-Instead of manipulating a `DataSet` or `DataStream`, you can create and work with a relational `Table` abstraction.
-Tables have a schema and allow running relational operations on them, including selection, aggregation, and joins.
-A `Table` can be created from a `DataSet` or a `DataStream` and then queried either using the Table API operators or using SQL queries.
-Once a `Table` is converted back to a `DataSet` or `DataStream`, the defined relational plan is optimized using [Apache Calcite](https://calcite.apache.org/)
+The Table API is a SQL-like expression language that can be embedded in Flink's DataSet and DataStream APIs (Java and Scala).
+A `DataSet` or `DataStream` can be converted into a relational `Table` abstraction. You can apply relational operators such as selection, aggregation, and joins on `Table`s or query them with regular SQL queries.
+When a `Table` is converted back into a `DataSet` or `DataStream`, the logical plan, which was defined by relational operators and SQL queries, is optimized using [Apache Calcite](https://calcite.apache.org/)
 and transformed into a `DataSet` or `DataStream` execution plan.
 
 * This will be replaced by the TOC
@@ -46,7 +44,7 @@ and transformed into a `DataSet` or `DataStream` execution plan.
 Using the Table API and SQL
 ----------------------------
 
-The Table API and SQL are part of the *flink-libraries* Maven project.
+The Table API and SQL are part of the *flink-table* Maven project.
 The following dependency must be added to your project in order to use the Table API and SQL:
 
 {% highlight xml %}
@@ -61,58 +59,73 @@ Note that the Table API is currently not part of the binary distribution. See li
 
 Table API
 ----------
-The Table API provides methods for running relational operations on Tables, both in Scala and Java.
-In the following sections you can find examples that show how to create Tables, how to define and execute relational queries on them,
-and how to retrieve the result of a query as a `DataSet`.
+The Table API provides methods to apply relational operations on DataSets, both in Scala and Java.
+
+The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet`, converted into a `DataSet`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments. 
+
+The following sections show by example how to use the Table API embedded in  the Scala and Java DataSet APIs.
 
 ### Scala Table API
 
-The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
-implicit conversions that allow
-converting a DataSet to a Table. This example shows how a DataSet can
-be converted, how relational queries can be specified and how a Table can be
-converted back to a DataSet:
+The Table API is enabled by importing `org.apache.flink.api.scala.table._`. This enables
+implicit conversions to convert a DataSet to a Table. The following example shows:
+
+- how a `DataSet` is converted to a `Table`,
+- how relational queries are specified, and 
+- how a `Table` is converted back to a `DataSet`.
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 
 case class WC(word: String, count: Int)
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
 val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-val expr = input.toTable
-val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]
+val expr = input.toTable(tEnv)
+val result = expr
+               .groupBy('word)
+               .select('word, 'count.sum as 'count)
+               .toDataSet[WC]
 {% endhighlight %}
 
 The expression DSL uses Scala symbols to refer to field names and code generation to
 transform expressions to efficient runtime code. Please note that the conversion to and from
-Tables only works when using Scala case classes or Flink POJOs. Please check out
-the [Type Extraction and Serialization]({{ site.baseurl }}/internals/types_serialization.html) section
-to learn the requirements for a class to be considered a POJO.
+Tables only works when using Scala case classes or Java POJOs. Please refer to the [Type Extraction and Serialization]({{ site.baseurl }}/internals/types_serialization.html) section
+to learn the characteristics of a valid POJO.
 
-This is another example that shows how you
-can join two Tables:
+Another example shows how to join two Tables:
 
 {% highlight scala %}
 case class MyResult(a: String, d: Int)
 
-val input1 = env.fromElements(...).toTable.as('a, 'b)
-val input2 = env.fromElements(...).toTable.as('c, 'd)
-val joined = input1.join(input2).where("a = c && d > 42").select("a, d").toDataSet[MyResult]
+val input1 = env.fromElements(...).toTable(tEnv).as('a, 'b)
+val input2 = env.fromElements(...).toTable(tEnv, 'c, 'd)
+
+val joined = input1.join(input2)
+               .where("a = c && d > 42")
+               .select("a, d")
+               .toDataSet[MyResult]
 {% endhighlight %}
 
-Notice, how a DataSet can be converted to a Table by using `as` and specifying new
-names for the fields. This can also be used to disambiguate fields before a join operation. Also,
-in this example we see that you can also use Strings to specify relational expressions.
+Notice, how the field names of a Table can be changed with `as()` or specified with `toTable()` when converting a DataSet to a Table. In addition, the example shows how to use Strings to specify relational expressions.
 
-Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
-description of the expression syntax.
+Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a description of the expression syntax.
 
 {% top %}
 
 ### Java Table API
 
-When using Java, Tables can be converted to and from DataSet using `TableEnvironment`.
-This example is equivalent to the above Scala Example:
+When using Flink's Java DataSet API, DataSets are converted to Tables and Tables to DataSets using a `TableEnvironment`.
+The following example shows:
+
+- how a `DataSet` is converted to a `Table`,
+- how relational queries are specified, and 
+- how a `Table` is converted back to a `DataSet`.
+
+It is equivalent to the Scala example in the previous section.
 
 {% highlight java %}
 
@@ -130,15 +143,15 @@ public class WC {
 
 ...
 
-ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-TableEnvironment tableEnv = new TableEnvironment();
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
 DataSet<WC> input = env.fromElements(
         new WC("Hello", 1),
         new WC("Ciao", 1),
         new WC("Hello", 1));
 
-Table table = tableEnv.fromDataSet(input);
+Table table = tEnv.fromDataSet(input);
 
 Table wordCounts = table
         .groupBy("word")
@@ -147,14 +160,14 @@ Table wordCounts = table
 DataSet<WC> result = tableEnv.toDataSet(wordCounts, WC.class);
 {% endhighlight %}
 
-When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
-are supported. They support exactly the same feature set as the expression DSL.
+With Java, expressions must be specified by Strings. The embedded expression DSL is not supported.
 
 {% top %}
 
 ### Table API Operators
-The Table API provides a domain-spcific language to execute language-integrated queries on structured data in Scala and Java.
-This section gives a brief overview of all available operators. You can find more details of operators in the [Javadoc]({{site.baseurl}}/api/java/org/apache/flink/api/table/Table.html).
+
+The Table API features a domain-specific language to execute language-integrated queries on structured data in Scala and Java.
+This section gives a brief overview of the available operators. You can find more details of operators in the [Javadoc]({{site.baseurl}}/api/java/org/apache/flink/api/table/Table.html).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -173,7 +186,7 @@ This section gives a brief overview of all available operators. You can find mor
     <tr>
       <td><strong>Select</strong></td>
       <td>
-        <p>Similar to a SQL SELECT statement. Perform a select operation.</p>
+        <p>Similar to a SQL SELECT statement. Performs a select operation.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.select("a, c as d");
@@ -184,7 +197,7 @@ Table result = in.select("a, c as d");
     <tr>
       <td><strong>As</strong></td>
       <td>
-        <p>Rename fields.</p>
+        <p>Renames fields.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.as("d, e, f");
@@ -193,51 +206,36 @@ Table result = in.as("d, e, f");
     </tr>
 
     <tr>
-      <td><strong>Filter</strong></td>
+      <td><strong>Where / Filter</strong></td>
       <td>
-        <p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
+        <p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.filter("a % 2 = 0");
+Table result = in.where("b = 'red'");
 {% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Where</strong></td>
-      <td>
-        <p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
+or
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.where("b = 'red'");
+Table result = in.filter("a % 2 = 0");
 {% endhighlight %}
       </td>
     </tr>
-
     <tr>
       <td><strong>GroupBy</strong></td>
       <td>
-        <p>Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation
-        operator to aggregate on per-group basis.</p>
+        <p>Similar to a SQL GROUPBY clause. Groups the rows on the grouping keys, with a following aggregation
+        operator to aggregate rows group-wise.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.groupBy("a").select("a, b.sum as d");
 {% endhighlight %}
-        <p><i>Note:</i> Flink can refer to nonaggregated columns in the select list that are not named in
-        the groupBy clause, it could be used to get better performance by avoiding unnecessary column sorting and
-        grouping while nonaggregated column is cogrouped with columns in groupBy clause. For example:</p>
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.groupBy("a").select("a, b, c.sum as d");
-{% endhighlight %}
       </td>
     </tr>
 
     <tr>
       <td><strong>Join</strong></td>
       <td>
-        <p>Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where
-        clause is mandatory for join condition.</p>
+        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f");
@@ -249,7 +247,7 @@ Table result = left.join(right).where("a = d").select("a, b, e");
     <tr>
       <td><strong>Union</strong></td>
       <td>
-        <p>Similar to a SQL UNION ALL clause. Union two tables, both tables must have identical schema(field names and types).</p>
+        <p>Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical schema, i.e., field names and types.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "a, b, c");
@@ -288,9 +286,9 @@ Table result = in.distinct();
     <tr>
       <td><strong>Select</strong></td>
       <td>
-        <p>Similar to a SQL SELECT statement. Perform a select operation.</p>
+        <p>Similar to a SQL SELECT statement. Performs a select operation.</p>
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.select('a, 'c as 'd);
 {% endhighlight %}
       </td>
@@ -299,30 +297,24 @@ val result = in.select('a, 'c as 'd);
     <tr>
       <td><strong>As</strong></td>
       <td>
-        <p>Rename fields.</p>
+        <p>Renames fields.</p>
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv).as('a, 'b, 'c);
 {% endhighlight %}
       </td>
     </tr>
 
     <tr>
-      <td><strong>Filter</strong></td>
+      <td><strong>Where / Filter</strong></td>
       <td>
-        <p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
+        <p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.filter('a % 2 === 0)
 {% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Where</strong></td>
-      <td>
-        <p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
+or
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.where('b === "red");
 {% endhighlight %}
       </td>
@@ -331,30 +323,22 @@ val result = in.where('b === "red");
     <tr>
       <td><strong>GroupBy</strong></td>
       <td>
-        <p>Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation
-        operator to aggregate on per-group basis.</p>
+        <p>Similar to a SQL GROUPBY clause. Groups rows on the grouping keys, with a following aggregation
+        operator to aggregate rows group-wise.</p>
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.groupBy('a).select('a, 'b.sum as 'd);
 {% endhighlight %}
-        <p><i>Note:</i> Flink can refer to nonaggregated columns in the select list that are not named in
-        the groupBy clause, it could be used to get better performance by avoiding unnecessary column sorting and
-        grouping while nonaggregated column is cogrouped with columns in groupBy clause. For example:</p>
-{% highlight scala %}
-val in = ds.as('a, 'b, 'c);
-val result = in.groupBy('a).select('a, 'b, 'c.sum as 'd);
-{% endhighlight %}
       </td>
     </tr>
 
     <tr>
       <td><strong>Join</strong></td>
       <td>
-        <p>Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where
-        clause is mandatory for join condition.</p>
+        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
 {% highlight scala %}
-val left = ds1.as('a, 'b, 'c);
-val right = ds2.as('d, 'e, 'f);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'd, 'e, 'f);
 val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
 {% endhighlight %}
       </td>
@@ -363,10 +347,10 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     <tr>
       <td><strong>Union</strong></td>
       <td>
-        <p>Similar to a SQL UNION ALL clause. Union two tables, both tables must have identical schema(field names and types).</p>
+        <p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical schema(field names and types).</p>
 {% highlight scala %}
-val left = ds1.as('a, 'b, 'c);
-val right = ds2.as('a, 'b, 'c);
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
 val result = left.unionAll(right);
 {% endhighlight %}
       </td>
@@ -377,7 +361,7 @@ val result = left.unionAll(right);
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
 {% highlight scala %}
-val in = ds.as('a, 'b, 'c);
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.distinct();
 {% endhighlight %}
       </td>
@@ -391,9 +375,7 @@ val result = in.distinct();
 {% top %}
 
 ### Expression Syntax
-Some of operators in previous section expect an expression. These can either be specified using an embedded Scala DSL or
-a String expression. Please refer to the examples above to learn how expressions can be
-formulated.
+Some of the operators in previous sections expect one or more expressions. Expressions can be specified using an embedded Scala DSL or as Strings. Please refer to the examples above to learn how expressions can be specified.
 
 This is the complete EBNF grammar for expressions:
 
@@ -438,19 +420,17 @@ SQL
 ----
 The Table API also supports embedded SQL queries.
 In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
-A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan` method:
+A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 // create a Table environment
-TableEnvironment tableEnv = new TableEnvironment();
-// reset the translation context: this will erase existing registered Tables
-TranslationContext.reset();
-// read a DataSet from an external source
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
 DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
-// register the DataSet under the name "MyTable"
+// register the DataSet as table "MyTable"
 tableEnv.registerDataSet("MyTable", ds);
 // retrieve "MyTable" into a new Table
 Table t = tableEnv.scan("MyTable");
@@ -461,12 +441,10 @@ Table t = tableEnv.scan("MyTable");
 {% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment
 // create a Table environment
-val tableEnv = new TableEnvironment
-// reset the translation context: this will erase existing registered Tables
-TranslationContext.reset()
-// read a DataSet from an external source
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
 val ds = env.readCsvFile(...)
-// register the DataSet under the name "MyTable"
+// register the DataSet as table "MyTable"
 tableEnv.registerDataSet("MyTable", ds)
 // retrieve "MyTable" into a new Table
 val t = tableEnv.scan("MyTable")
@@ -476,20 +454,20 @@ val t = tableEnv.scan("MyTable")
 
 *Note: Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as this is reserved for internal use only.*
 
-When registering a `DataSet`, one can also give names to the `Table` columns. For example, if "MyTable" has three columns, `user`, `product`, and `order`, we can give them names upon registering the `DataSet` as shown below:
+When registering a `DataSet`, one can also specify the field names of the table:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// register the DataSet under the name "MyTable" with columns user, product, and order
-tableEnv.registerDataSet("MyTable", ds, "user, product, order");
+// register the DataSet as table "Orders" with fields user, product, and amount
+tableEnv.registerDataSet("Orders", ds, "user, product, amount");
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// register the DataSet under the name "MyTable" with columns user, product, and order
-tableEnv.registerDataSet("MyTable", ds, 'user, 'product, 'order)
+// register the DataSet as table "Orders" with fields user, product, and amount
+tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount)
 {% endhighlight %}
 </div>
 </div>
@@ -499,61 +477,49 @@ A `Table` can be registered in a similar way:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// read a DataSet from an external source
-DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
-// create a Table from the DataSet with columns user, product, and order
-Table t = tableEnv.fromDataSet(ds).as("user, product, order");
-// register the Table under the name "MyTable"
-tableEnv.registerTable("MyTable", t);
+Table t = tableEnv.fromDataSet(ds).as("user, product, amount");
+// register the Table as table "Orders"
+tableEnv.registerTable("Orders", t);
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// read a DataSet from an external source and
-// create a Table from the DataSet with columns user, product, and order
-val t = env.readCsvFile(...).as('user, 'product, 'order)
-// register the Table under the name "MyTable"
-tableEnv.registerTable("MyTable", t)
+val t = ds.toTable(tableEnv, 'user, 'product, 'amount)
+// register the Table as table "Orders"
+tableEnv.registerTable("Orders", t)
 {% endhighlight %}
 </div>
 </div>
 
-After registering a `Table` or `DataSet`, one can use them in SQL queries. A SQL query is defined using the `sql` method of the `TableEnvironment`.
-The result of the method is a new `Table` which can either be converted back to a `DataSet` or used in subsequent Table API queries.
+Registered tables can be used in SQL queries. A SQL query is defined using the `sql()` method of the `TableEnvironment`. It returns a new `Table` which can be converted back to a `DataSet` or used in subsequent Table API queries.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-// create a Table environment
-TableEnvironment tableEnv = new TableEnvironment();
-// reset the translation context: this will erase existing registered Tables
-TranslationContext.reset();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
 // read a DataSet from an external source
 DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
-// create a Table from the DataSet
-Table t = tableEnv.fromDataSet(ds);
-// register the Table under the name "MyTable"
-tableEnv.registerTable("MyTable", t);
-// run a sql query and retrieve the result in a new Table
-Table result = tableEnv.sql("SELECT * FROM MyTable");
+// register the DataSet as table "Orders"
+tableEnv.registerDataSet("Orders", ds, "user, product, amount");
+// run a SQL query and retrieve the result in a new Table
+Table result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10");
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment
-// create a Table environment
-val tableEnv = new TableEnvironment
-// reset the translation context: this will erase existing registered Tables
-TranslationContext.reset()
-// create a Table
-val t = env.readCsvFile(...).as('a, 'b, 'c)
-// register the Table under the name "MyTable"
-tableEnv.registerTable("MyTable", t)
-// run a sql query and retrieve the result in a new Table
-val result = tableEnv.sql("SELECT * FROM MyTable")
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataSet from an external source
+val ds = env.readCsvFile(...)
+// register the DataSet under the name "Orders"
+tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount)
+// run a SQL query and retrieve the result in a new Table
+val result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10")
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
index 97113bb..1db4cb8 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
@@ -19,12 +19,14 @@
 /**
  * <strong>Table API (Java)</strong><br>
  *
- * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to create a
- * {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet}
- * or {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ * A {@link org.apache.flink.api.java.table.BatchTableEnvironment} can be used to create a
+ * {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet}.
+ * Equivalently, a {@link org.apache.flink.api.java.table.StreamTableEnvironment} can be used to
+ * create a {@link org.apache.flink.api.table.Table} from a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream}.
  *
  * <p>
- * This can be used to perform SQL-like queries on data. Please have
+ * Tables can be used to perform SQL-like queries on data. Please have
  * a look at {@link org.apache.flink.api.table.Table} to see which operations are supported and
  * how query strings are written.
  *
@@ -32,29 +34,30 @@
  * Example:
  *
  * <pre>{@code
- * ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ * BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  *
  * DataSet<WC> input = env.fromElements(
  *   new WC("Hello", 1),
  *   new WC("Ciao", 1),
  *   new WC("Hello", 1));
  *
- * Table table = TableUtil.from(input);
+ * Table table = tEnv.fromDataSet(input);
  *
  * Table filtered = table
  *     .groupBy("word")
  *     .select("word.count as count, word")
  *     .filter("count = 2");
  *
- * DataSet<WC> result = TableUtil.toSet(filtered, WC.class);
+ * DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
  *
  * result.print();
- * env.execute();
  * }</pre>
  *
  * <p>
  * As seen above, a {@link org.apache.flink.api.table.Table} can be converted back to the
- * underlying API representation using {@link org.apache.flink.api.java.table.TableEnvironment#toDataSet(org.apache.flink.api.table.Table, java.lang.Class)}
- * or {@link org.apache.flink.api.java.table.TableEnvironment#toDataStream(org.apache.flink.api.table.Table, java.lang.Class)}}.
+ * underlying API representation using
+ * {@link org.apache.flink.api.java.table.BatchTableEnvironment#toDataSet(org.apache.flink.api.table.Table, java.lang.Class)}
+ * or {@link org.apache.flink.api.java.table.StreamTableEnvironment#toDataStream(org.apache.flink.api.table.Table, java.lang.Class)}}.
  */
 package org.apache.flink.api.java.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
index c7e69c9..0a776a4 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
@@ -20,7 +20,8 @@ package org.apache.flink.examples.java;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
 
 /**
  * Very simple example that shows how the Java Table API can be used.
@@ -49,7 +50,7 @@ public class JavaTableExample {
 
 	public static void main(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<WC> input = env.fromElements(
 				new WC("Hello", 1),

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
new file mode 100644
index 0000000..69bff95
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.expressions.ExpressionParser
+import org.apache.flink.api.table.{TableConfig, Table}
+
+/**
+  * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
+  * [[ExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataSet]] to a [[Table]]
+  * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataSet]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Java batch [[ExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class BatchTableEnvironment(
+    protected val execEnv: ExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.api.table.BatchTableEnvironment(config) {
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet)
+    scan(name)
+  }
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataSet<Tuple2<String, Long>> set = ...
+    *   Table tab = tableEnv.fromDataSet(set, "a, b")
+    * }}}
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet, exprs)
+    scan(name)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table with specified field names in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataSet<Tuple2<String, Long>> set = ...
+    *   tableEnv.registerDataSet("myTable", set, "a, b")
+    * }}}
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): Unit = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet, exprs)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataSet]].
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
+    translate[T](table)(typeInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
deleted file mode 100644
index 4688c82..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.Programs
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{FlinkPlannerImpl, TableConfig, Table}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and
- * translating them back to Java [[org.apache.flink.api.java.DataSet]]s.
- */
-class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
-
-  type Representation[A] = JavaDataSet[A]
-
-  override def createTable[A](
-      repr: Representation[A],
-      fieldIndexes: Array[Int],
-      fieldNames: Array[String]): Table = {
-
-    // create table representation from DataSet
-    val dataSetTable = new DataSetTable[A](
-      repr.asInstanceOf[JavaDataSet[A]],
-      fieldIndexes,
-      fieldNames
-    )
-
-    val tabName = TranslationContext.registerDataSetTable(dataSetTable)
-    val relBuilder = TranslationContext.getRelBuilder
-
-    // create table scan operator
-    relBuilder.scan(tabName)
-    val relNode = relBuilder.build()
-    new Table(relNode, relBuilder)
-  }
-
-  override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
-
-    // get the planner for the plan
-    val planner = lPlan.getCluster.getPlanner
-
-    // decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
-
-    // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-    val flinkOutputProps = lPlan.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-
-    val dataSetPlan = try {
-      optProgram.run(planner, decorPlan, flinkOutputProps)
-    }
-    catch {
-      case e: CannotPlanException =>
-        throw new PlanGenException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-          s"${RelOptUtil.toString(lPlan)}\n" +
-          "Please consider filing a bug report.", e)
-      case a: AssertionError =>
-        throw a.getCause
-    }
-
-    dataSetPlan match {
-      case node: DataSetRel =>
-        node.translateToPlan(
-          config,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[JavaDataSet[A]]
-      case _ => ???
-    }
-
-  }
-
-  /**
-   * Parse, validate, and translate a SQL query into a relNode Table
-   */
-  def translateSQL(query: String): Table = {
-
-    val frameworkConfig = TranslationContext.getFrameworkConfig
-    val planner = new FlinkPlannerImpl(frameworkConfig, TranslationContext.getPlanner)
-    // parse the sql query
-    val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
-
-    new Table(relational.rel, TranslationContext.getRelBuilder)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
deleted file mode 100644
index e4b8ca0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.{RelCollations, RelNode}
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.Programs
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaDataStream}
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.DataStreamTable
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s
- * from Java [[org.apache.flink.streaming.api.datastream.DataStream]]s and
- * translating them back to Java [[org.apache.flink.streaming.api.datastream.DataStream]]s.
- */
-class JavaStreamTranslator(config: TableConfig) extends PlanTranslator {
-
-  type Representation[A] = JavaDataStream[A]
-
-  override def createTable[A](
-      repr: Representation[A],
-      fieldIndexes: Array[Int],
-      fieldNames: Array[String]): Table = {
-
-    // create table representation from DataSet
-    val dataStreamTable = new DataStreamTable[A](
-      repr.asInstanceOf[JavaDataStream[A]],
-      fieldIndexes,
-      fieldNames
-    )
-
-    val tabName = TranslationContext.addDataStream(dataStreamTable)
-    val relBuilder = TranslationContext.getRelBuilder
-
-    // create table scan operator
-    relBuilder.scan(tabName)
-    new Table(relBuilder.build(), relBuilder)
-  }
-
-  override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataStream[A] = {
-
-    // get the planner for the plan
-    val planner = lPlan.getCluster.getPlanner
-
-    // decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
-
-    // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(FlinkRuleSets.DATASTREAM_OPT_RULES)
-    val flinkOutputProps = RelTraitSet.createEmpty()
-      .plus(DataStreamConvention.INSTANCE)
-      .plus(RelCollations.of()).simplify()
-
-    val dataStreamPlan = try {
-      optProgram.run(planner, decorPlan, flinkOutputProps)
-    }
-    catch {
-      case e: CannotPlanException =>
-        throw new PlanGenException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-            s"${RelOptUtil.toString(lPlan)}\n" +
-            "Please consider filing a bug report.", e)
-    }
-
-    dataStreamPlan match {
-      case node: DataStreamRel =>
-        node.translateToPlan(
-          config,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[JavaDataStream[A]]
-      case _ => ???
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
new file mode 100644
index 0000000..7479426
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.expressions.ExpressionParser
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+/**
+  * The [[org.apache.flink.api.table.TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataStream]] to a [[Table]]
+  * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataStream]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Java [[StreamExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class StreamTableEnvironment(
+    protected val execEnv: StreamExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.api.table.StreamTableEnvironment(config) {
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the
+    * [[DataStream]].
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataStream<Tuple2<String, Long>> stream = ...
+    *   Table tab = tableEnv.fromDataStream(stream, "a, b")
+    * }}}
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream, exprs)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+    translate[T](table)(typeInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
deleted file mode 100644
index 90954c6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{AbstractTableEnvironment, Table}
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Environment for working with the Table API.
- *
- * This can be used to convert a [[DataSet]] or a [[DataStream]] to a [[Table]] and back again. You
- * can also use the provided methods to create a [[Table]] directly from a data source.
- */
-class TableEnvironment extends AbstractTableEnvironment {
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataSet type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   tableEnv.fromDataSet(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table where the fields
-   * are named a and b.
-   */
-  def fromDataSet[T](set: DataSet[T], fields: String): Table = {
-    new JavaBatchTranslator(config).createTable(set, fields)
-  }
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataSet type are used to name the
-   * [[org.apache.flink.api.table.Table]] fields.
-   */
-  def fromDataSet[T](set: DataSet[T]): Table = {
-    new JavaBatchTranslator(config).createTable(set)
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataSet. The given type must have exactly the same field types and field order as the
-   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
-   * POJO types require name equivalence to be mapped correctly as their fields do not have
-   * an order.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
-    new JavaBatchTranslator(config).translate[T](table.relNode)(
-      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataSet. The given type must have exactly the same field types and field order as the
-   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
-   * POJO types require name equivalence to be mapped correctly as their fields do not have
-   * an order.
-   */
-  def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
-    new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
-  }
-
-  /**
-   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
-   * The fields of the DataSet type are used to name the Table fields.
-   * @param name the Table name
-   * @param dataset the DataSet to register
-   */
-  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
-    registerDataSetInternal(name, dataset)
-  }
-
-  /**
-   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
-   * The fields of the DataSet type are renamed to the given set of fields.
-   *
-   * @param name the Table name
-   * @param dataset the DataSet to register
-   * @param fields the Table field names
-   */
-  def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = {
-    val exprs = ExpressionParser
-      .parseExpressionList(fields)
-      .toArray
-    registerDataSetInternal(name, dataset, exprs)
-  }
-
-    /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   tableEnv.fromDataStream(stream, "a, b")
-   * }}}
-   *
-   * This will transform the stream containing elements of two fields to a table where the fields
-   * are named a and b.
-   */
-  def fromDataStream[T](set: DataStream[T], fields: String): Table = {
-    new JavaStreamTranslator(config).createTable(set, fields)
-  }
-
-  /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are used to name the
-   * [[org.apache.flink.api.table.Table]] fields.
-   */
-  def fromDataStream[T](set: DataStream[T]): Table = {
-    new JavaStreamTranslator(config).createTable(set)
-  }
-
-    /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataStream. The given type must have exactly the same field types and field order as the
-   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
-   * POJO types require name equivalence to be mapped correctly as their fields do not have
-   * an order.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    new JavaStreamTranslator(config).translate[T](table.relNode)(
-      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataStream. The given type must have exactly the same field types and field order as the
-   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
-   * POJO types require name equivalence to be mapped correctly as their fields do not have
-   * an order.
-   */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
-    new JavaStreamTranslator(config).translate[T](table.relNode)(typeInfo)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
new file mode 100644
index 0000000..a18f338
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{TableConfig, Table}
+
+import scala.reflect.ClassTag
+
+/**
+  * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala batch [[DataSet]]
+  * [[ExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataSet]] to a [[Table]]
+  * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataSet]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Scala batch [[ExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class BatchTableEnvironment(
+    protected val execEnv: ExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.api.table.BatchTableEnvironment(config) {
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet.javaSet)
+    scan(name)
+  }
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataSet[(String, Long)] = ...
+    *   val tab: Table = tableEnv.fromDataSet(set, 'a, 'b)
+    * }}}
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+    scan(name)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet.javaSet)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table with specified field names in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataSet[(String, Long)] = ...
+    *   tableEnv.registerDataSet("myTable", set, 'a, 'b)
+    * }}}
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
index 6e630ea..ce437c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
@@ -18,56 +18,44 @@
 package org.apache.flink.api.scala.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeutils.CompositeType
-
 import org.apache.flink.api.scala._
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.Expression
 
 /**
- * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is
- * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]].
- */
-class DataSetConversions[T](set: DataSet[T], inputType: TypeInformation[T]) {
+  * Holds methods to convert a [[DataSet]] into a [[Table]].
+  *
+  * @param dataSet The [[DataSet]] to convert.
+  * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
+  * @tparam T The type of the [[DataSet]].
+  */
+class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
 
   /**
-   * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.as('a, 'b)
-   * }}}
-   *
-   * This results in a [[Table]] that has field `a` of type `String` and field `b`
-   * of type `Int`.
-   */
-  def as(fields: Expression*): Table = {
-     new ScalaBatchTranslator().createTable(set, fields.toArray)
-  }
-
-  /**
-   * Converts the [[DataSet]] to a [[Table]]. The field names will be taken from the field names
-   * of the input type.
-   *
-   * Example:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.toTable
-   * }}}
-   *
-   * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2`
-   * of type `Int`.
-   */
-  def toTable: Table = {
-
-    inputType match {
-      case c: CompositeType[T] =>
-        val resultFields = c.getFieldNames.map(UnresolvedFieldReference)
-        as(resultFields: _*)
-      case _ =>
-        throw new IllegalArgumentException("" +
-          "Please specify a field name with 'as' to convert an atomic type dataset to a table ")
+    * Converts the [[DataSet]] into a [[Table]].
+    *
+    * The field name of the new [[Table]] can be specified like this:
+    *
+    * {{{
+    *   val env = ExecutionEnvironment.getExecutionEnvironment
+    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *
+    *   val set: DataSet[(String, Int)] = ...
+    *   val table = set.toTable(tEnv, 'name, 'amount)
+    * }}}
+    *
+    * If not explicitly specified, field names are automatically extracted from the type of
+    * the [[DataSet]].
+    *
+    * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created.
+    * @param fields The field names of the new [[Table]] (optional).
+    * @return The resulting [[Table]].
+    */
+  def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
+    if (fields.isEmpty) {
+      tableEnv.fromDataSet(dataSet)
+    } else {
+      tableEnv.fromDataSet(dataSet, fields: _*)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
index cd663b2..3b724cf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
@@ -17,50 +17,46 @@
  */
 package org.apache.flink.api.scala.table
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.scala._
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.streaming.api.scala.DataStream
 
 /**
- * Methods for converting a [[DataStream]] to a [[Table]]. A [[DataStream]] is
- * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]].
- */
-class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) {
-
-  /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this:
-   *
-   * {{{
-   *   val in: DataStream[(String, Int)] = ...
-   *   val table = in.asStream('a, 'b)
-   * }}}
-   *
-   * This results in a [[Table]] that has field `a` of type `String` and field `b`
-   * of type `Int`.
-   */
-  def as(fields: Expression*): Table = {
-     new ScalaStreamTranslator().createTable(set, fields.toArray)
-  }
+  * Holds methods to convert a [[DataStream]] into a [[Table]].
+  *
+  * @param dataStream The [[DataStream]] to convert.
+  * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
+  * @tparam T The type of the [[DataStream]].
+  */
+class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
 
   /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field names
-   * of the input type.
-   *
-   * Example:
-   *
-   * {{{
-   *   val in: DataStream[(String, Int)] = ...
-   *   val table = in.toStreamTable
-   * }}}
-   *
-   * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2`
-   * of type `Int`.
-   */
-  def toStreamTable: Table = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
+    * Converts the [[DataStream]] into a [[Table]].
+    *
+    * The field name of the new [[Table]] can be specified like this:
+    *
+    * {{{
+    *   val env = StreamExecutionEnvironment.getExecutionEnvironment
+    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *
+    *   val stream: DataStream[(String, Int)] = ...
+    *   val table = stream.toTable(tEnv, 'name, 'amount)
+    * }}}
+    *
+    * If not explicitly specified, field names are automatically extracted from the type of
+    * the [[DataStream]].
+    *
+    * @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created.
+    * @param fields The field names of the new [[Table]] (optional).
+    * @return The resulting [[Table]].
+    */
+  def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = {
+    if (fields.isEmpty) {
+      tableEnv.fromDataStream(dataStream)
+    } else {
+      tableEnv.fromDataStream(dataStream, fields:_*)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
deleted file mode 100644
index 642654a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.java.table.JavaBatchTranslator
-import org.apache.flink.api.scala.wrap
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-
-import scala.reflect.ClassTag
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
- * translating them back to Scala [[DataSet]]s.
- */
-class ScalaBatchTranslator(config: TableConfig = TableConfig.DEFAULT) extends PlanTranslator {
-
-  private val javaTranslator = new JavaBatchTranslator(config)
-
-  type Representation[A] = DataSet[A]
-
-  override def createTable[A](
-    repr: Representation[A],
-    fieldIndexes: Array[Int],
-    fieldNames: Array[String]): Table =
-  {
-    javaTranslator.createTable(repr.javaSet, fieldIndexes, fieldNames)
-  }
-
-  override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataSet[O] = {
-    // fake it till you make it ...
-    wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
deleted file mode 100644
index be3664f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.java.table.JavaStreamTranslator
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.asScalaStream
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
- * translating them back to Scala [[DataStream]]s.
- */
-class ScalaStreamTranslator(config: TableConfig = TableConfig.DEFAULT) extends PlanTranslator {
-
-  private val javaTranslator = new JavaStreamTranslator(config)
-
-  type Representation[A] = DataStream[A]
-
-  override def createTable[A](
-    repr: Representation[A],
-    fieldIndexes: Array[Int],
-    fieldNames: Array[String]): Table =
-  {
-    javaTranslator.createTable(repr.javaStream, fieldIndexes, fieldNames)
-  }
-
-  override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
-    asScalaStream(javaTranslator.translate(op))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
new file mode 100644
index 0000000..15ef55e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}
+
+import org.apache.flink.streaming.api.scala.asScalaStream
+
+/**
+  * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataStream]] to a [[Table]]
+  * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataStream]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class StreamTableEnvironment(
+    protected val execEnv: StreamExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.api.table.StreamTableEnvironment(config) {
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the
+    * [[DataStream]].
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream.javaStream)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   val stream: DataStream[(String, Long)] = ...
+    *   val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
+    * }}}
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+    asScalaStream(translate(table))
+  }
+
+}