You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:52 UTC

[07/12] flink git commit: [FLINK-3738] [table] Refactor TableEnvironments. Remove Translators and TranslationContext.

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
new file mode 100644
index 0000000..c596014
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TableEnvironmentITCase extends TableProgramsTestBase {
+
+	public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testSimpleRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds);
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("f0, f1");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testRegisterWithFields() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds, "a, b, c");
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+				"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterExistingDatasetTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("MyTable", ds);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+				CollectionDataSets.getSmall5TupleDataSet(env);
+		// Must fail. Name is already used for different table.
+		tableEnv.registerDataSet("MyTable", ds2);
+	}
+
+	@Test(expected = TableException.class)
+	public void testScanUnregisteredTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. No table registered under that name.
+		tableEnv.scan("nonRegisteredTable");
+	}
+
+	@Test
+	public void testTableRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.registerTable(tableName, t);
+		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+				"13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testIllegalName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		// Must fail. Table name matches internal name pattern.
+		tableEnv.registerTable("_DataSetTable_42", t);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterTableFromOtherEnv() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+		// Must fail. Table is bound to different TableEnvironment.
+		tableEnv2.registerTable("MyTable", t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index 167e45c..b69fec4 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -20,11 +20,13 @@ package org.apache.flink.api.java.table.test;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -43,7 +45,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testUnion() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
@@ -62,7 +64,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testUnionWithFilter() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -81,7 +83,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testUnionIncompatibleNumberOfFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -96,7 +98,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testUnionIncompatibleFieldsName() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
@@ -111,7 +113,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testUnionIncompatibleFieldTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -126,7 +128,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testUnionWithAggregation() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -145,7 +147,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testUnionWithJoin() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = new TableEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -164,5 +166,21 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	      "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
 	      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
 	    compareResultAsText(results, expected);
-	  }
+	}
+
+	@Test(expected = TableException.class)
+	public void testUnionTablesFromDifferentEnvs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+		Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
+
+		// Must fail. Tables are bound to different TableEnvironments.
+		in1.unionAll(in2);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
index d577564..08f44d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class AggregationsITCase(
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
 
@@ -54,7 +52,7 @@ class AggregationsITCase(
     val result = tEnv.sql(sqlQuery)
 
     val expected = "231,1,21,21,11"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -62,18 +60,17 @@ class AggregationsITCase(
   def testTableAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT sum(_1) FROM MyTable"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "231"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -81,8 +78,7 @@ class AggregationsITCase(
   def testDataSetAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT sum(_1) FROM MyTable"
 
@@ -92,7 +88,7 @@ class AggregationsITCase(
     val result = tEnv.sql(sqlQuery)
 
     val expected = "231"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -100,8 +96,7 @@ class AggregationsITCase(
   def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7)" +
       "FROM MyTable"
@@ -114,7 +109,7 @@ class AggregationsITCase(
     val result = tEnv.sql(sqlQuery)
 
     val expected = "1,1,1,1,1.5,1.5,2"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -122,21 +117,20 @@ class AggregationsITCase(
   def testTableWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
       "FROM MyTable"
 
     val ds = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).as('a, 'b, 'c, 'd, 'e, 'f, 'g)
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "1,1,1,1,1.5,1.5,2"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -144,19 +138,18 @@ class AggregationsITCase(
   def testTableProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
       "FROM MyTable"
 
-    val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).as('a, 'b)
+    val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "1,3,2,1,3"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -164,19 +157,18 @@ class AggregationsITCase(
   def testTableAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
       "FROM MyTable"
 
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).as('a, 'b)
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "5.5,7"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -184,18 +176,17 @@ class AggregationsITCase(
   def testAggregationWithTwoCount(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
 
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "2,2"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -204,21 +195,20 @@ class AggregationsITCase(
   def testAggregationAfterProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
       "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
 
     val ds = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "1,3,2"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
index 171e200..e1edd87 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,18 +42,17 @@ class FilterITCase(
   def testAllRejectingFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE false"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -62,12 +60,11 @@ class FilterITCase(
   def testAllPassingFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE true"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -78,7 +75,7 @@ class FilterITCase(
       "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
       "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
       "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -86,18 +83,17 @@ class FilterITCase(
   def testFilterOnString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -105,12 +101,11 @@ class FilterITCase(
   def testFilterOnInteger(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -119,7 +114,7 @@ class FilterITCase(
       "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
       "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
       "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -127,18 +122,17 @@ class FilterITCase(
   def testDisjunctivePredicate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -146,12 +140,11 @@ class FilterITCase(
   def testFilterWithAnd(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -159,7 +152,7 @@ class FilterITCase(
     val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
       "9,4,Comment#3\n" + "17,6,Comment#11\n" +
       "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
index b09aa75..561f2a7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -22,8 +22,8 @@ import org.apache.calcite.tools.ValidationException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.{PlanGenException, TranslationContext}
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
+import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -44,20 +44,19 @@ class JoinITCase(
   def testJoin(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -65,20 +64,19 @@ class JoinITCase(
   def testJoinWithFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "Hi,Hallo\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -86,13 +84,12 @@ class JoinITCase(
   def testJoinWithJoinFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
@@ -100,7 +97,7 @@ class JoinITCase(
 
     val expected = "Hello world, how are you?,Hallo Welt wie\n" +
       "I am fine.,Hallo Welt wie\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -108,13 +105,12 @@ class JoinITCase(
   def testJoinWithMultipleKeys(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
@@ -122,7 +118,7 @@ class JoinITCase(
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
       "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -130,13 +126,12 @@ class JoinITCase(
   def testJoinNonExistingKey(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
@@ -147,13 +142,12 @@ class JoinITCase(
   def testJoinNonMatchingKeyTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
@@ -164,13 +158,12 @@ class JoinITCase(
   def testJoinWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
@@ -181,20 +174,19 @@ class JoinITCase(
   def testJoinWithAlias(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
     val result = tEnv.sql(sqlQuery)
     val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
       "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -202,25 +194,23 @@ class JoinITCase(
   def testJoinNoEqualityPredicate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
   }
 
   @Test
   def testDataSetJoinWithAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
 
@@ -232,7 +222,7 @@ class JoinITCase(
     val result = tEnv.sql(sqlQuery)
 
     val expected = "6,6"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -240,20 +230,19 @@ class JoinITCase(
   def testTableJoinWithAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
     val result = tEnv.sql(sqlQuery)
 
     val expected = "6,6"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -261,50 +250,47 @@ class JoinITCase(
   def testFullOuterJoin(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
   }
 
   @Test(expected = classOf[PlanGenException])
   def testLeftOuterJoin(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
   }
 
   @Test(expected = classOf[PlanGenException])
   def testRightOuterJoin(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
 
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
index f08c95c..b1d423b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
@@ -22,8 +22,7 @@ import org.apache.calcite.tools.ValidationException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -44,12 +43,11 @@ class SelectITCase(
   def testSelectStarFromTable(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -61,7 +59,7 @@ class SelectITCase(
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
 
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -69,8 +67,7 @@ class SelectITCase(
   def testSelectStarFromDataSet(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable"
 
@@ -86,7 +83,7 @@ class SelectITCase(
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
 
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -94,12 +91,11 @@ class SelectITCase(
   def testSimpleSelectAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT a, b, c FROM MyTable"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -111,7 +107,7 @@ class SelectITCase(
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
 
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -119,12 +115,11 @@ class SelectITCase(
   def testSelectWithNaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sql(sqlQuery)
@@ -133,7 +128,7 @@ class SelectITCase(
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
       "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
 
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -141,12 +136,11 @@ class SelectITCase(
   def testInvalidFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT a, foo FROM MyTable"
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     tEnv.sql(sqlQuery)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
index 153a9d0..71334f4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class TableWithSQLITCase(
   def testSQLTable(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
@@ -54,7 +52,7 @@ class TableWithSQLITCase(
     val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
 
     val expected = "15,65,12"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -62,10 +60,9 @@ class TableWithSQLITCase(
   def testTableSQLTable(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val t1 = ds.filter('a > 9)
 
     tEnv.registerTable("MyTable", t1)
@@ -75,7 +72,7 @@ class TableWithSQLITCase(
     val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
 
     val expected = "16,60,12"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -83,10 +80,9 @@ class TableWithSQLITCase(
   def testMultipleSQLQueries(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
@@ -97,7 +93,7 @@ class TableWithSQLITCase(
     val result2 = tEnv.sql(sqlQuery2)
 
     val expected = "6"
-    val results = result2.toDataSet[Row](getConfig).collect()
+    val results = result2.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
index 4a031a3..fadc48f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class UnionITCase(
   def testUnion(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
 
@@ -56,7 +54,7 @@ class UnionITCase(
     val result = tEnv.sql(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-    val results = result.toDataSet[Row](getConfig).collect()
+    val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -65,8 +63,11 @@ class UnionITCase(
   def testUnionWithFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    if (tEnv.getConfig.getEfficientTypeUsage) {
+      return
+    }
 
     val sqlQuery = "SELECT c FROM (" +
       "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
@@ -89,8 +90,11 @@ class UnionITCase(
   def testUnionWithAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    if (tEnv.getConfig.getEfficientTypeUsage) {
+      return
+    }
 
     val sqlQuery = "SELECT count(c) FROM (" +
       "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
@@ -106,4 +110,5 @@ class UnionITCase(
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
index fbfb39e..4760d37 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.streaming.test
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
@@ -40,8 +40,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
      * Test simple filter
      */
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter('a === 3)
     val results = filterDs.toDataStream[Row]
@@ -58,8 +60,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
      * Test all-rejecting filter
      */
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(false) )
     val results = filterDs.toDataStream[Row]
@@ -75,8 +79,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
      * Test all-passing filter
      */
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(true) )
     val results = filterDs.toDataStream[Row]
@@ -96,8 +102,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
      * Test filter on Integer tuple field.
      */
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
     val results = filterDs.toDataStream[Row]
@@ -118,8 +126,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
      * Test filter on Integer tuple field.
      */
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 !== 0)
     val results = filterDs.toDataStream[Row]

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
index 83cb7fd..75a9c97 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
@@ -20,12 +20,10 @@ package org.apache.flink.api.scala.table.streaming.test
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.JavaConversions._
 import org.junit.Test
 import org.junit.Assert._
 import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
@@ -37,8 +35,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testSimpleSelectAll(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toStreamTable.select('_1, '_2, '_3)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -55,8 +54,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testSelectFirst(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toStreamTable.select('_1)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -71,8 +71,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
 
     // verify ProjectMergeRule.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toStreamTable
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
       .select('a, 'b)
 
@@ -91,8 +92,10 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testSimpleSelectAllWithAs(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c).select('a, 'b, 'c)
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -109,8 +112,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testAsWithToFewFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -124,8 +128,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testAsWithToManyFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c, 'd)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -139,8 +144,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'b)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
@@ -155,8 +161,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
   def testOnlyFieldRefInAs(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b as 'c, 'd)
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
 
     val results = ds.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
index ed5e9b5..ae81f3b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.streaming.test
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import scala.collection.JavaConverters._
@@ -36,9 +36,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUnion(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val unionDs = ds1.unionAll(ds2).select('c)
 
@@ -54,9 +56,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUnionWithFilter(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'd, 'c, 'e)
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
 
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
@@ -71,9 +75,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
   @Test(expected = classOf[IllegalArgumentException])
   def testUnionFieldsNameNotOverlap1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'd, 'c, 'e)
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
 
     val unionDs = ds1.unionAll(ds2)
 
@@ -87,9 +93,12 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
   @Test(expected = classOf[IllegalArgumentException])
   def testUnionFieldsNameNotOverlap2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c)
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
 
     val unionDs = ds1.unionAll(ds2)
 
@@ -101,4 +110,17 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(true, StreamITCase.testResults.isEmpty)
   }
 
+  @Test(expected = classOf[TableException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index abf2735..26cdc76 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.scala.table.test
 
 import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -38,7 +38,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
 
     val results = t.toDataSet[Row].collect()
@@ -50,7 +52,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationOnNonExistingField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       // Must fail. Field 'foo does not exist.
       .select('foo.avg)
   }
@@ -59,9 +63,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
       .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
 
     val expected = "1,1,1,1,1.5,1.5,2"
@@ -73,9 +79,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(
       (1: Byte, 1: Short),
-      (2: Byte, 2: Short)).toTable
+      (2: Byte, 2: Short)).toTable(tEnv)
       .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
 
     val expected = "1,3,2,1,3"
@@ -87,7 +95,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
       .select(('_1 + 2).avg + 2, '_2.count + 5)
 
     val expected = "5.5,7"
@@ -99,7 +109,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationWithTwoCount(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
       .select('_1.count, '_2.count)
 
     val expected = "2,2"
@@ -111,9 +123,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationAfterProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
       .select('_1, '_2, '_3)
       .select('_1.avg, '_2.sum, '_3.count)
 
@@ -126,7 +140,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("Hello", 1)).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
       // Must fail. Field '_1 is not a numeric type.
       .select('_1.sum)
 
@@ -137,7 +153,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("Hello", 1)).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
       // Must fail. Sum aggregation can not be chained.
       .select('_2.sum.sum)
   }
@@ -146,7 +164,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testSQLStyleAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .select(
         """Sum( a) as a1, a.sum as a2,
           |Min (a) as b1, a.min as b2,
@@ -164,13 +184,15 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testPojoAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val input = env.fromElements(
       MyWC("hello", 1),
       MyWC("hello", 1),
       MyWC("ciao", 1),
       MyWC("hola", 1),
       MyWC("hola", 1))
-    val expr = input.toTable
+    val expr = input.toTable(tEnv)
     val result = expr
       .groupBy('word)
       .select('word, 'count.sum as 'count)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
deleted file mode 100644
index c2e4e96..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAsFromCaseClass(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .as('a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAsFromAndToCaseClass(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .as('a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .as('a, 'b)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .as('a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .as('a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithNonFieldReference1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .as('a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithNonFieldReference2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .as('a as 'foo, 'b, 'c)
-  }
-
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
index c5d31da..af5e6d7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -39,24 +39,26 @@ class CalcITCase(
 
   @Test
   def testSimpleCalc(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
         .select('_1, '_2, '_3)
         .where('_1 < 7)
         .select('_1, '_3)
 
     val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
       "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
-      val results = t.toDataSet[Row](getConfig).collect()
+      val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testCalcWithTwoFilters(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
         .select('_1, '_2, '_3)
         .where('_1 < 7 && '_2 === 3)
         .select('_1, '_3)
@@ -64,15 +66,16 @@ class CalcITCase(
         .select('_1)
 
     val expected = "4\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testCalcWithAggregation(): Unit = {
-    
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
         .select('_1, '_2, '_3)
         .where('_1 < 15)
         .groupBy('_2)
@@ -80,21 +83,23 @@ class CalcITCase(
         .where('cnt > 3)
 
     val expected = "7,4\n" + "11,4\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testCalcJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
       .where('b > 1).select('a, 'd).where('d === 2)
 
     val expected = "2,2\n" + "3,2\n"
-    val results = joinT.toDataSet[Row](getConfig).collect()
+    val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }