You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/16 16:57:21 UTC

[2/3] flink git commit: [TableAPI] [tests] Disambiguate result of test programs to DataSet

[TableAPI] [tests] Disambiguate result of test programs to DataSet

This closes #1131


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3150a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3150a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3150a3c

Branch: refs/heads/master
Commit: a3150a3c3ac9b35ef6daf64ccf309498591483dd
Parents: 7bea901
Author: HuangWHWHW <40...@qq.com>
Authored: Tue Sep 15 21:22:28 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 16 16:16:03 2015 +0200

----------------------------------------------------------------------
 .../api/scala/table/test/AggregationsITCase.scala   | 11 ++++++-----
 .../flink/api/scala/table/test/AsITCase.scala       | 14 +++++++-------
 .../flink/api/scala/table/test/CastingITCase.scala  |  5 +++++
 .../api/scala/table/test/ExpressionsITCase.scala    | 16 ++++++++--------
 .../table/test/GroupedAggreagationsITCase.scala     | 10 +++++-----
 .../flink/api/scala/table/test/SelectITCase.scala   | 14 ++++++++------
 .../scala/table/test/StringExpressionsITCase.scala  | 10 +++++-----
 7 files changed, 44 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 62ac345..acbeab7 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -57,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
-      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -69,7 +69,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
-      .select('foo.avg)
+      .select('foo.avg).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -84,6 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
       .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -95,7 +96,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
-      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
+      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -107,7 +108,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
-      .select('_1.sum)
+      .select('_1.sum).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -119,7 +120,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
-      .select('_2.sum.sum)
+      .select('_2.sum.sum).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index c6259ec..28d0e07 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -54,7 +54,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -70,7 +70,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -81,7 +81,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -92,7 +92,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -104,7 +104,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -116,7 +116,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 524d75a..1e34521 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -26,6 +26,7 @@ import org.junit.runners.Parameterized
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -55,6 +56,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
       .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -69,6 +71,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
       .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -85,6 +88,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
       .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -104,6 +108,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
         '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
         '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
         '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
+    .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index d9de287..5905d24 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
@@ -53,7 +53,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 10)).as('a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
+      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -65,7 +65,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, true)).as('a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b)
+      .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -77,7 +77,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
+      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -90,7 +90,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -103,7 +103,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -116,7 +116,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = env.fromElements((3.0, 5)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -129,7 +129,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .groupBy("a").select("a, a.count As cnt")
+      .groupBy("a").select("a, a.count As cnt").toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 5afd6ca..0269f07 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -55,7 +55,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('_foo)
-      .select('a.avg)
+      .select('a.avg).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -71,7 +71,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
-      .select('b, 'a.sum)
+      .select('b, 'a.sum).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -87,7 +87,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
-      .select('a.sum)
+      .select('a.sum).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -108,7 +108,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
           |Max (a ) as c1, a.max as c2,
           |Avg ( a ) as d1, a.avg as d2,
           |Count(a) as e1, a.count as e2
-        """.stripMargin)
+        """.stripMargin).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 1a13d93..b0c13c2 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -54,6 +54,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -70,6 +71,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+      .toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -87,7 +89,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('_1 as 'a, '_2 as 'b)
-      .select('a, 'b)
+      .select('a, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -100,7 +102,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -111,7 +113,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -122,7 +124,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -134,7 +136,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   def testOnlyFieldRefInAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/a3150a3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 65fe77a..e221321 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
@@ -52,7 +52,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
-      .select('a.substring(0, 'b))
+      .select('a.substring(0, 'b)).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -63,7 +63,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
-      .select('a.substring('b))
+      .select('a.substring('b)).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -75,7 +75,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
-      .select('a.substring(0, 'b))
+      .select('a.substring(0, 'b)).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
@@ -87,7 +87,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
-      .select('a.substring('b, 15))
+      .select('a.substring('b, 15)).toDataSet[Row]
 
     ds.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()