You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:51 UTC

[06/12] flink git commit: [FLINK-3738] [table] Refactor TableEnvironments. Remove Translators and TranslationContext.

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 5ca8c7f..c0499e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -23,7 +23,7 @@ import java.util.Date
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
@@ -42,7 +42,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     // don't test everything, just some common cast directions
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tEnv)
       .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
 
     val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
@@ -56,9 +58,11 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     // don't test everything, just some common cast directions
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f)
       .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
 
     val expected = "2,2,2,2,2.0,2.0"
@@ -72,7 +76,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   def testAutoCastToString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable(tEnv)
       .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date")
 
     val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
@@ -84,8 +90,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   def testCasting(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements((1, 0.0, 1L, true))
-      .toTable
+      .toTable(tEnv)
       .select(
         // * -> String
         '_1.cast(BasicTypeInfo.STRING_TYPE_INFO),
@@ -121,8 +129,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   def testCastFromString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(("1", "true", "2.0"))
-      .toTable
+      .toTable(tEnv)
       .select(
         // String -> BASIC TYPE (not String, Date, Void, Character)
         '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
@@ -143,8 +153,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   def testCastDateFromString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = env.fromElements(("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
-      .toTable
+      .toTable(tEnv)
       .select(
         '_1.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
         '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
@@ -161,8 +173,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   @Test
   def testCastDateToStringAndLong(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
-    val t = ds.toTable
+    val t = ds.toTable(tEnv)
       .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
         '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
       .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
index 770819a..fc1284e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit.Test
@@ -36,8 +36,9 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
   @Test
   def testDistinct(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val distinct = ds.select('b).distinct()
 
     val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
@@ -48,8 +49,9 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
   @Test
   def testDistinctAfterAggregate(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
+    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
     val distinct = ds.groupBy('a, 'e).select('e).distinct()
 
     val expected = "1\n" + "2\n" + "3\n"

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 29b3be4..9e908dc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,8 +23,9 @@ import java.util.Date
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.{Literal, Null}
+import org.apache.flink.api.table.expressions.Null
+import org.apache.flink.api.table.{TableEnvironment, Row}
+import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -38,18 +39,20 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class ExpressionsITCase(
     mode: TestExecutionMode,
-    config: TableConfigMode)
-  extends TableProgramsTestBase(mode, config) {
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((5, 10)).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = env.fromElements((5, 10)).toTable(tEnv, 'a, 'b)
       .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
 
     val expected = "0,10,2,10,1,-5"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -57,11 +60,13 @@ class ExpressionsITCase(
   def testLogic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((5, true)).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
       .select('b && true, 'b && false, 'b || false, !'b)
 
     val expected = "true,false,true,false"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -69,11 +74,13 @@ class ExpressionsITCase(
   def testComparisons(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = env.fromElements((5, 5, 4)).toTable(tEnv, 'a, 'b, 'c)
       .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
 
     val expected = "true,true,false,false,true"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -81,32 +88,34 @@ class ExpressionsITCase(
   def testCaseInsensitiveForAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val t = env.fromElements((3, 5.toByte)).as('a, 'b)
+    val t = env.fromElements((3, 5.toByte)).toTable(tEnv, 'a, 'b)
       .groupBy("a").select("a, a.count As cnt")
 
     val expected = "3,1"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testNullLiteral(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val t = env.fromElements((1, 0)).as('a, 'b)
+    val t = env.fromElements((1, 0)).toTable(tEnv, 'a, 'b)
       .select(
         'a,
         'b,
         Null(BasicTypeInfo.INT_TYPE_INFO),
         Null(BasicTypeInfo.STRING_TYPE_INFO) === "")
 
-    val expected = if (getConfig.getNullCheck) {
+    val expected = if (config.getNullCheck) {
       "1,0,null,null"
     } else {
       "1,0,-1,true"
     }
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -115,14 +124,15 @@ class ExpressionsITCase(
   @Test
   def testDateLiteral(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val t = env.fromElements((0L, "test")).as('a, 'b)
+    val t = env.fromElements((0L, "test")).toTable(tEnv, 'a, 'b)
       .select('a,
         Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
         'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
 
     val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 946f584..51dfe74 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode
@@ -43,19 +43,23 @@ class FilterITCase(
   @Test
   def testAllRejectingFilter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(false) )
 
     val expected = "\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testAllPassingFilter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(true) )
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
@@ -64,25 +68,29 @@ class FilterITCase(
       "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
       "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
       "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testFilterOnStringTupleField(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val filterDs = ds.filter( 'c.like("%world%") )
 
     val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testFilterOnIntegerTupleField(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
 
@@ -90,14 +98,16 @@ class FilterITCase(
       "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
       "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
       "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testNotEquals(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 !== 0)
     val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
@@ -111,7 +121,9 @@ class FilterITCase(
   @Test
   def testDisjunctivePredicate(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a < 2 || 'a > 20)
     val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
@@ -122,7 +134,9 @@ class FilterITCase(
   @Test
   def testConsecutiveFilters(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
     val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
@@ -135,31 +149,37 @@ class FilterITCase(
   @Test
   def testFilterBasicType(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
     val ds = CollectionDataSets.getStringDataSet(env)
 
-    val filterDs = ds.as('a).filter( 'a.like("H%") )
+    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testFilterOnCustomType(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
     val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.as('myInt as 'i, 'myLong as 'l, 'myString as 's)
+    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
       .filter( 's.like("%a%") )
 
     val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-    val results = filterDs.toDataSet[Row](getConfig).collect()
+    val results = filterDs.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testFilterInvalidFieldName(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     // must fail. Field 'foo does not exist
     ds.filter( 'foo === 2 )

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index 01aa00f..a9edbb0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -38,7 +38,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupingOnNonExistentField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       // must fail. '_foo not a valid field
       .groupBy('_foo)
       .select('a.avg)
@@ -48,7 +50,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupingInvalidSelection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('a, 'b)
       // must fail. 'c is not a grouping key or aggregation
       .select('c)
@@ -58,7 +62,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupedAggregate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('b)
       .select('b, 'a.sum)
 
@@ -71,7 +77,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupingKeyForwardIfNotUsed(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('b)
       .select('a.sum)
 
@@ -84,8 +92,10 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupNoAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val t = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c)
       .groupBy('b)
       .select('a.sum as 'd, 'b)
       .groupBy('b, 'd)
@@ -101,6 +111,8 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
     // This uses very long keys to force serialized comparison.
     // With short keys, the normalized key is sufficient.
     val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val ds = env.fromElements(
       ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
       ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
@@ -111,7 +123,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
       ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
       ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
       ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).as('a, 'b, 'c)
+      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('a, 'b)
       .select('c.sum)
 
@@ -124,7 +136,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupedAggregateWithConstant1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .select('a, 4 as 'four, 'b)
       .groupBy('four, 'a)
       .select('four, 'b.sum)
@@ -141,7 +155,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupedAggregateWithConstant2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
         .select('b, 4 as 'four, 'a)
         .groupBy('b, 'four)
         .select('four, 'a.sum)
@@ -155,7 +171,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupedAggregateWithExpression(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
         .groupBy('e, 'b % 3)
         .select('c.min, 'e, 'a.avg, 'd.count)
 
@@ -169,7 +187,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   def testGroupedAggregateWithFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('b)
       .select('b, 'a.sum)
       .where('b === 2)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 41983bc..2442091 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -36,8 +36,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
 
@@ -50,8 +52,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   def testJoinWithFilter(): Unit = {
 
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
 
@@ -63,8 +67,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithJoinFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
 
@@ -77,8 +83,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
@@ -91,8 +99,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[IllegalArgumentException])
   def testJoinNonExistingKey(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     ds1.join(ds2)
       // must fail. Field 'foo does not exist
@@ -103,8 +113,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testJoinWithNonMatchingKeyTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     ds1.join(ds2)
       // must fail. Field 'a is Int, and 'g is String
@@ -115,8 +127,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[IllegalArgumentException])
   def testJoinWithAmbiguousFields(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
 
     ds1.join(ds2)
       // must fail. Both inputs share the same field 'c
@@ -127,8 +141,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate1(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     ds1.join(ds2)
       // must fail. No equality join predicate
@@ -139,8 +155,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate2(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     ds1.join(ds2)
       // must fail. No equality join predicate
@@ -151,8 +169,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
 
@@ -163,10 +183,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @Test
   def testJoinWithGroupedAggregation(): Unit = {
-
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2)
       .where('a === 'd)
@@ -180,11 +201,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @Test
   def testJoinPushThroughJoin(): Unit = {
-
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).as('j, 'k, 'l)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
 
     val joinT = ds1.join(ds2)
       .where(Literal(true))
@@ -199,10 +221,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @Test
   def testJoinWithDisjunctivePred(): Unit = {
-
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10 )).select('c, 'g)
 
@@ -216,10 +239,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @Test
   def testJoinWithExpressionPreds(): Unit = {
-
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
 
@@ -233,4 +257,17 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test(expected = classOf[TableException])
+  def testJoinTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.join(ds2).where('b === 'e).select('c, 'g)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
deleted file mode 100644
index 535c064..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.TranslationContext
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class RegisterDataSetITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds)
-    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRegisterWithFields(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
-    val t = tEnv.scan(tableName).select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
-      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingDataSet(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds1)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testScanUnregisteredTable(): Unit = {
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    tEnv.scan("someTable")
-  }
-
-  @Test
-  def testTableRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    tEnv.registerTable(tableName, t)
-
-    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
-
-    val expected = "9,4\n" + "10,4\n" +
-      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
-      "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = regT.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = getScalaTableEnvironment
-    TranslationContext.reset()
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable
-    tEnv.registerTable("MyTable", t1)
-    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable
-    tEnv.registerDataSet("MyTable", t2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 46d620a..82668a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
@@ -41,9 +41,10 @@ class SelectITCase(
 
   @Test
   def testSimpleSelectAll(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -51,15 +52,16 @@ class SelectITCase(
       "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testSimpleSelectAllWithAs(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -67,63 +69,68 @@ class SelectITCase(
       "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testSimpleSelectWithNaming(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
       .select('a, 'b)
 
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
       "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
   def testSimpleSelectRenameAll(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
       .select('a, 'b)
 
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
       "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row](getConfig).collect()
+    val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testSelectInvalidFieldFields(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       // must fail. Field 'foo does not exist
       .select('a, 'foo)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testSelectAmbiguousRenaming(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       // must fail. 'a and 'b are both renamed to 'foo
       .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testSelectAmbiguousRenaming2(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       // must fail. 'a and 'b are both renamed to 'a
       .select('a, 'b as 'a).toDataSet[Row].print()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
index 861a801..545721d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.test
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import org.junit._
@@ -31,17 +31,16 @@ class SqlExplainTest
 
   val testFilePath = SqlExplainTest.this.getClass.getResource("/").getFile
 
-  @Before
-  def resetContext(): Unit = {
-    TranslationContext.reset()
-  }
-
   @Test
   def testFilterWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table = env.fromElements((1, "hello")).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    val result = table.filter("a % 2 = 0").explain()
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter0.out").mkString
     assertEquals(result, source)
@@ -50,9 +49,13 @@ class SqlExplainTest
   @Test
   def testFilterWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table = env.fromElements((1, "hello")).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
 
-    val result = table.filter("a % 2 = 0").explain(true)
+    val result = tEnv.explain(table, true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter1.out").mkString
     assertEquals(result, source)
@@ -61,10 +64,13 @@ class SqlExplainTest
   @Test
   def testJoinWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table1 = env.fromElements((1, "hello")).as('a, 'b)
-    val table2 = env.fromElements((1, "hello")).as('c, 'd)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    val result = table1.join(table2).where("b = d").select("a, c").explain()
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
+
+    val result = tEnv.explain(table)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin0.out").mkString
     assertEquals(result, source)
@@ -73,10 +79,13 @@ class SqlExplainTest
   @Test
   def testJoinWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table1 = env.fromElements((1, "hello")).as('a, 'b)
-    val table2 = env.fromElements((1, "hello")).as('c, 'd)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
 
-    val result = table1.join(table2).where("b = d").select("a, c").explain(true)
+    val result = tEnv.explain(table, true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin1.out").mkString
     assertEquals(result, source)
@@ -85,10 +94,13 @@ class SqlExplainTest
   @Test
   def testUnionWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table1 = env.fromElements((1, "hello")).as('count, 'word)
-    val table2 = env.fromElements((1, "hello")).as('count, 'word)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    val result = table1.unionAll(table2).explain()
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion0.out").mkString
     assertEquals(result, source)
@@ -97,10 +109,13 @@ class SqlExplainTest
   @Test
   def testUnionWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val table1 = env.fromElements((1, "hello")).as('count, 'word)
-    val table2 = env.fromElements((1, "hello")).as('count, 'word)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
 
-    val result = table1.unionAll(table2).explain(true)
+    val result = tEnv.explain(table, true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion1.out").mkString
     assertEquals(result, source)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 310d133..1ad57b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.test
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
@@ -36,7 +36,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   @Test
   def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).toTable(tEnv, 'a, 'b)
       .select('a.substring(1, 'b))
 
     val expected = "AA\nB"
@@ -47,7 +49,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   @Test
   def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).toTable(tEnv, 'a, 'b)
       .select('a.substring('b))
 
     val expected = "CD\nBCD"
@@ -57,9 +61,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
   @Test(expected = classOf[CodeGenException])
   def testNonWorkingSubstring1(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).toTable(tEnv, 'a, 'b)
       // must fail, second argument of substring must be Integer not Double.
       .select('a.substring(0, 'b))
 
@@ -68,9 +73,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
   @Test(expected = classOf[CodeGenException])
   def testNonWorkingSubstring2(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).toTable(tEnv, 'a, 'b)
       // must fail, first argument of substring must be Integer not String.
       .select('a.substring('b, 15))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..bd1ce46
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSimpleRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds)
+    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRegisterWithFields(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+    val t = tEnv.scan(tableName).select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingDataSet(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds1)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    // Must fail. Name is already in use.
+    tEnv.registerDataSet("MyTable", ds2)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testScanUnregisteredTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    // Must fail. No table registered under that name.
+    tEnv.scan("someTable")
+  }
+
+  @Test
+  def testTableRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable(tableName, t)
+
+    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+    val expected = "9,4\n" + "10,4\n" +
+      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+      "19,6\n" + "20,6\n" + "21,6\n"
+
+    val results = regT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", t1)
+    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+    // Must fail. Name is already in use.
+    tEnv.registerDataSet("MyTable", t2)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterTableFromOtherEnv(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
+    // Must fail. Table is bound to different TableEnvironment.
+    tEnv2.registerTable("MyTable", t1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
new file mode 100644
index 0000000..f162846
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableEnvironment, Row}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ToTableITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testToTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "Peter,28,4000.0,Sales\n" +
+      "Anna,56,10000.0,Engineering\n" +
+      "Lucy,42,6000.0,HR\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromAndToCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+    val results = t.toDataSet[SomeCaseClass].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testToTableWithToFewFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testToTableWithToManyFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testToTableWithAmbiguousFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Field names not unique.
+      .toTable(tEnv, 'a, 'b, 'b)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testToTableWithNonFieldReference1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a + 1, 'b, 'c)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testToTableWithNonFieldReference2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a as 'foo, 'b, 'c)
+  }
+
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+  def this() { this("", 0, 0.0, "") }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
index a155935..0448386 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -40,8 +40,10 @@ class UnionITCase(
   @Test
   def testUnion(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val unionDs = ds1.unionAll(ds2).select('c)
 
@@ -53,9 +55,11 @@ class UnionITCase(
   @Test
   def testTernaryUnion(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
 
@@ -69,8 +73,14 @@ class UnionITCase(
   @Test
   def testUnionWithFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    if (tEnv.getConfig.getEfficientTypeUsage) {
+      return
+    }
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
 
     val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
@@ -82,8 +92,10 @@ class UnionITCase(
   @Test(expected = classOf[IllegalArgumentException])
   def testUnionDifferentFieldNames(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
 
     // must fail. Union inputs have different field names.
     ds1.unionAll(ds2)
@@ -92,8 +104,11 @@ class UnionITCase(
   @Test(expected = classOf[IllegalArgumentException])
   def testUnionDifferentFieldTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
 
     // must fail. Union inputs have different field types.
     ds1.unionAll(ds2)
@@ -102,8 +117,14 @@ class UnionITCase(
   @Test
   def testUnionWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    if (tEnv.getConfig.getEfficientTypeUsage) {
+      return
+    }
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
 
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
 
@@ -115,9 +136,15 @@ class UnionITCase(
   @Test
   def testUnionWithJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
-    val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).as('a2, 'b2, 'd2, 'c2, 'e2)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    if (tEnv.getConfig.getEfficientTypeUsage) {
+      return
+    }
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv,'a, 'b, 'd, 'c, 'e)
+    val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).toTable(tEnv, 'a2, 'b2, 'd2, 'c2, 'e2)
 
     val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c))
       .join(ds3.select('a2, 'b2, 'c2))
@@ -130,4 +157,17 @@ class UnionITCase(
       "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test(expected = classOf[TableException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2).select('c)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
index 48dea56..4e1ae02 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -25,12 +25,11 @@ import org.apache.calcite.tools.{Frameworks, RelBuilder}
 import org.apache.flink.api.common.functions.{Function, MapFunction}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.table.{TableEnvironment, TableConfig}
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.TranslationContext
-import org.apache.flink.api.table.plan.schema.DataSetTable
 import org.apache.flink.api.table.runtime.FunctionCompiler
 import org.mockito.Mockito._
 
@@ -45,20 +44,26 @@ object ExpressionEvaluator {
       compile(getClass.getClassLoader, genFunc.name, genFunc.code)
   }
 
-  private def prepareTable(typeInfo: TypeInformation[Any]): (String, RelBuilder) = {
+  private def prepareTable(
+    typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = {
+
     // create DataSetTable
     val dataSetMock = mock(classOf[DataSet[Any]])
-    when(dataSetMock.getType).thenReturn(typeInfo)
-    val tableName = TranslationContext.registerDataSetTable(new DataSetTable[Any](
-      dataSetMock,
-      (0 until typeInfo.getArity).toArray,
-      (0 until typeInfo.getArity).map("f" + _).toArray))
+    val jDataSetMock = mock(classOf[JDataSet[Any]])
+    when(dataSetMock.javaSet).thenReturn(jDataSetMock)
+    when(jDataSetMock.getType).thenReturn(typeInfo)
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val tableName = "myTable"
+    tEnv.registerDataSet(tableName, dataSetMock)
 
     // prepare RelBuilder
-    val relBuilder = TranslationContext.getRelBuilder
+    val relBuilder = tEnv.getRelBuilder
     relBuilder.scan(tableName)
 
-    (tableName, relBuilder)
+    (tableName, relBuilder, tEnv)
   }
 
   def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = {
@@ -66,7 +71,7 @@ object ExpressionEvaluator {
     val table = prepareTable(typeInfo)
 
     // create RelNode from SQL expression
-    val planner = Frameworks.getPlanner(TranslationContext.getFrameworkConfig)
+    val planner = Frameworks.getPlanner(table._3.getFrameworkConfig)
     val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
     val validated = planner.validate(parsed)
     val converted = planner.rel(validated)

http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
index a971136..723646b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
@@ -20,11 +20,11 @@ package org.apache.flink.api.table.test.utils
 
 import java.util
 
-import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
+import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaTableEnv}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
+import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaTableEnv}
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{TableEnvironment, TableConfig}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL}
 import org.apache.flink.test.util.MultipleProgramsTestBase
@@ -38,34 +38,16 @@ class TableProgramsTestBase(
     tableConfigMode: TableConfigMode)
   extends MultipleProgramsTestBase(mode) {
 
-  def getJavaTableEnvironment: JavaTableEnv = {
-    val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
-    val tableEnv = new JavaTableEnv
-    configure(tableEnv.getConfig)
-    tableEnv
-  }
-
-  def getScalaTableEnvironment: ScalaTableEnv = {
-    val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
-    val tableEnv = new ScalaTableEnv
-    configure(tableEnv.getConfig)
-    tableEnv
-  }
-
-  def getConfig: TableConfig = {
-    val config = new TableConfig()
-    configure(config)
-    config
-  }
-
-  def configure(config: TableConfig): Unit = {
+  def config: TableConfig = {
+    val conf = new TableConfig
     tableConfigMode match {
       case NULL =>
-        config.setNullCheck(true)
+        conf.setNullCheck(true)
       case EFFICIENT =>
-        config.setEfficientTypeUsage(true)
+        conf.setEfficientTypeUsage(true)
       case _ => // keep default
     }
+    conf
   }
 }