You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:51 UTC
[06/12] flink git commit: [FLINK-3738] [table] Refactor
TableEnvironments. Remove Translators and TranslationContext.
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 5ca8c7f..c0499e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -23,7 +23,7 @@ import java.util.Date
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.codegen.CodeGenException
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
@@ -42,7 +42,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
// don't test everything, just some common cast directions
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tEnv)
.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
@@ -56,9 +58,11 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
// don't test everything, just some common cast directions
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f)
.filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
val expected = "2,2,2,2,2.0,2.0"
@@ -72,7 +76,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
def testAutoCastToString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable(tEnv)
.select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date")
val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
@@ -84,8 +90,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
def testCasting(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements((1, 0.0, 1L, true))
- .toTable
+ .toTable(tEnv)
.select(
// * -> String
'_1.cast(BasicTypeInfo.STRING_TYPE_INFO),
@@ -121,8 +129,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
def testCastFromString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(("1", "true", "2.0"))
- .toTable
+ .toTable(tEnv)
.select(
// String -> BASIC TYPE (not String, Date, Void, Character)
'_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
@@ -143,8 +153,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
def testCastDateFromString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
- .toTable
+ .toTable(tEnv)
.select(
'_1.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
'_2.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
@@ -161,8 +173,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
@Test
def testCastDateToStringAndLong(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
- val t = ds.toTable
+ val t = ds.toTable(tEnv)
.select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
'_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
.select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
index 770819a..fc1284e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit.Test
@@ -36,8 +36,9 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
@Test
def testDistinct(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val distinct = ds.select('b).distinct()
val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
@@ -48,8 +49,9 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
@Test
def testDistinctAfterAggregate(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
val distinct = ds.groupBy('a, 'e).select('e).distinct()
val expected = "1\n" + "2\n" + "3\n"
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 29b3be4..9e908dc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,8 +23,9 @@ import java.util.Date
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.{Literal, Null}
+import org.apache.flink.api.table.expressions.Null
+import org.apache.flink.api.table.{TableEnvironment, Row}
+import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -38,18 +39,20 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class ExpressionsITCase(
mode: TestExecutionMode,
- config: TableConfigMode)
- extends TableProgramsTestBase(mode, config) {
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
@Test
def testArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((5, 10)).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements((5, 10)).toTable(tEnv, 'a, 'b)
.select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
val expected = "0,10,2,10,1,-5"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -57,11 +60,13 @@ class ExpressionsITCase(
def testLogic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((5, true)).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
.select('b && true, 'b && false, 'b || false, !'b)
val expected = "true,false,true,false"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -69,11 +74,13 @@ class ExpressionsITCase(
def testComparisons(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements((5, 5, 4)).toTable(tEnv, 'a, 'b, 'c)
.select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
val expected = "true,true,false,false,true"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -81,32 +88,34 @@ class ExpressionsITCase(
def testCaseInsensitiveForAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val t = env.fromElements((3, 5.toByte)).as('a, 'b)
+ val t = env.fromElements((3, 5.toByte)).toTable(tEnv, 'a, 'b)
.groupBy("a").select("a, a.count As cnt")
val expected = "3,1"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testNullLiteral(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val t = env.fromElements((1, 0)).as('a, 'b)
+ val t = env.fromElements((1, 0)).toTable(tEnv, 'a, 'b)
.select(
'a,
'b,
Null(BasicTypeInfo.INT_TYPE_INFO),
Null(BasicTypeInfo.STRING_TYPE_INFO) === "")
- val expected = if (getConfig.getNullCheck) {
+ val expected = if (config.getNullCheck) {
"1,0,null,null"
} else {
"1,0,-1,true"
}
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -115,14 +124,15 @@ class ExpressionsITCase(
@Test
def testDateLiteral(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val t = env.fromElements((0L, "test")).as('a, 'b)
+ val t = env.fromElements((0L, "test")).toTable(tEnv, 'a, 'b)
.select('a,
Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 946f584..51dfe74 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
@@ -43,19 +43,23 @@ class FilterITCase(
@Test
def testAllRejectingFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(false) )
val expected = "\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testAllPassingFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(true) )
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
@@ -64,25 +68,29 @@ class FilterITCase(
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testFilterOnStringTupleField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'c.like("%world%") )
val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testFilterOnIntegerTupleField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 === 0 )
@@ -90,14 +98,16 @@ class FilterITCase(
"6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testNotEquals(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 !== 0)
val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
@@ -111,7 +121,9 @@ class FilterITCase(
@Test
def testDisjunctivePredicate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a < 2 || 'a > 20)
val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
@@ -122,7 +134,9 @@ class FilterITCase(
@Test
def testConsecutiveFilters(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
@@ -135,31 +149,37 @@ class FilterITCase(
@Test
def testFilterBasicType(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
val ds = CollectionDataSets.getStringDataSet(env)
- val filterDs = ds.as('a).filter( 'a.like("H%") )
+ val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testFilterOnCustomType(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val filterDs = ds.as('myInt as 'i, 'myLong as 'l, 'myString as 's)
+ val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
.filter( 's.like("%a%") )
val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
- val results = filterDs.toDataSet[Row](getConfig).collect()
+ val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[IllegalArgumentException])
def testFilterInvalidFieldName(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
// must fail. Field 'foo does not exist
ds.filter( 'foo === 2 )
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index 01aa00f..a9edbb0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.scala.table.test
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
@@ -38,7 +38,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupingOnNonExistentField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
// must fail. '_foo not a valid field
.groupBy('_foo)
.select('a.avg)
@@ -48,7 +50,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupingInvalidSelection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.groupBy('a, 'b)
// must fail. 'c is not a grouping key or aggregation
.select('c)
@@ -58,7 +62,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupedAggregate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.groupBy('b)
.select('b, 'a.sum)
@@ -71,7 +77,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupingKeyForwardIfNotUsed(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.groupBy('b)
.select('a.sum)
@@ -84,8 +92,10 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupNoAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = CollectionDataSets.get3TupleDataSet(env)
- .as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c)
.groupBy('b)
.select('a.sum as 'd, 'b)
.groupBy('b, 'd)
@@ -101,6 +111,8 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
// This uses very long keys to force serialized comparison.
// With short keys, the normalized key is sufficient.
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val ds = env.fromElements(
("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
@@ -111,7 +123,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
- .rebalance().setParallelism(2).as('a, 'b, 'c)
+ .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
.groupBy('a, 'b)
.select('c.sum)
@@ -124,7 +136,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupedAggregateWithConstant1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.select('a, 4 as 'four, 'b)
.groupBy('four, 'a)
.select('four, 'b.sum)
@@ -141,7 +155,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupedAggregateWithConstant2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.select('b, 4 as 'four, 'a)
.groupBy('b, 'four)
.select('four, 'a.sum)
@@ -155,7 +171,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupedAggregateWithExpression(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
.groupBy('e, 'b % 3)
.select('c.min, 'e, 'a.avg, 'd.count)
@@ -169,7 +187,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
def testGroupedAggregateWithFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.groupBy('b)
.select('b, 'a.sum)
.where('b === 2)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 41983bc..2442091 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.scala.table.test
-import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
@@ -36,8 +36,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoin(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
@@ -50,8 +52,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
def testJoinWithFilter(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
@@ -63,8 +67,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithJoinFilter(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
@@ -77,8 +83,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
@@ -91,8 +99,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test(expected = classOf[IllegalArgumentException])
def testJoinNonExistingKey(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
ds1.join(ds2)
// must fail. Field 'foo does not exist
@@ -103,8 +113,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test(expected = classOf[TableException])
def testJoinWithNonMatchingKeyTypes(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
ds1.join(ds2)
// must fail. Field 'a is Int, and 'g is String
@@ -115,8 +127,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test(expected = classOf[IllegalArgumentException])
def testJoinWithAmbiguousFields(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
ds1.join(ds2)
// must fail. Both inputs share the same field 'c
@@ -127,8 +141,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test(expected = classOf[TableException])
def testNoEqualityJoinPredicate1(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
ds1.join(ds2)
// must fail. No equality join predicate
@@ -139,8 +155,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test(expected = classOf[TableException])
def testNoEqualityJoinPredicate2(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
ds1.join(ds2)
// must fail. No equality join predicate
@@ -151,8 +169,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithAggregation(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
@@ -163,10 +183,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithGroupedAggregation(): Unit = {
-
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2)
.where('a === 'd)
@@ -180,11 +201,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinPushThroughJoin(): Unit = {
-
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).as('j, 'k, 'l)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
val joinT = ds1.join(ds2)
.where(Literal(true))
@@ -199,10 +221,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithDisjunctivePred(): Unit = {
-
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10 )).select('c, 'g)
@@ -216,10 +239,11 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
@Test
def testJoinWithExpressionPreds(): Unit = {
-
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
@@ -233,4 +257,17 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test(expected = classOf[TableException])
+ def testJoinTablesFromDifferentEnvs(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ ds1.join(ds2).where('b === 'e).select('c, 'g)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
deleted file mode 100644
index 535c064..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.TranslationContext
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class RegisterDataSetITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testSimpleRegister(): Unit = {
-
- val tableName = "MyTable"
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet(tableName, ds)
- val t = tEnv.scan(tableName).select('_1, '_2, '_3)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = t.toDataSet[Row](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRegisterWithFields(): Unit = {
-
- val tableName = "MyTable"
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
- val t = tEnv.scan(tableName).select('a, 'b)
-
- val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
- "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
- "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
- val results = t.toDataSet[Row](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[TableException])
- def testRegisterExistingDataSet(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds1)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds2)
- }
-
- @Test(expected = classOf[TableException])
- def testScanUnregisteredTable(): Unit = {
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- tEnv.scan("someTable")
- }
-
- @Test
- def testTableRegister(): Unit = {
-
- val tableName = "MyTable"
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- tEnv.registerTable(tableName, t)
-
- val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
-
- val expected = "9,4\n" + "10,4\n" +
- "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
- "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
- "19,6\n" + "20,6\n" + "21,6\n"
-
- val results = regT.toDataSet[Row](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[TableException])
- def testRegisterExistingTable(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
-
- val t1 = CollectionDataSets.get3TupleDataSet(env).toTable
- tEnv.registerTable("MyTable", t1)
- val t2 = CollectionDataSets.get5TupleDataSet(env).toTable
- tEnv.registerDataSet("MyTable", t2)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 46d620a..82668a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
@@ -41,9 +41,10 @@ class SelectITCase(
@Test
def testSimpleSelectAll(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -51,15 +52,16 @@ class SelectITCase(
"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testSimpleSelectAllWithAs(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -67,63 +69,68 @@ class SelectITCase(
"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testSimpleSelectWithNaming(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1 as 'a, '_2 as 'b, '_1 as 'c)
.select('a, 'b)
val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testSimpleSelectRenameAll(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1 as 'a, '_2 as 'b, '_3 as 'c)
.select('a, 'b)
val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[IllegalArgumentException])
def testSelectInvalidFieldFields(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
// must fail. Field 'foo does not exist
.select('a, 'foo)
}
@Test(expected = classOf[IllegalArgumentException])
def testSelectAmbiguousRenaming(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
// must fail. 'a and 'b are both renamed to 'foo
.select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
}
@Test(expected = classOf[IllegalArgumentException])
def testSelectAmbiguousRenaming2(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
// must fail. 'a and 'b are both renamed to 'a
.select('a, 'b as 'a).toDataSet[Row].print()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
index 861a801..545721d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit._
@@ -31,17 +31,16 @@ class SqlExplainTest
val testFilePath = SqlExplainTest.this.getClass.getResource("/").getFile
- @Before
- def resetContext(): Unit = {
- TranslationContext.reset()
- }
-
@Test
def testFilterWithoutExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table = env.fromElements((1, "hello")).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
- val result = table.filter("a % 2 = 0").explain()
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = tEnv.explain(table)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testFilter0.out").mkString
assertEquals(result, source)
@@ -50,9 +49,13 @@ class SqlExplainTest
@Test
def testFilterWithExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table = env.fromElements((1, "hello")).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
- val result = table.filter("a % 2 = 0").explain(true)
+ val result = tEnv.explain(table, true)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testFilter1.out").mkString
assertEquals(result, source)
@@ -61,10 +64,13 @@ class SqlExplainTest
@Test
def testJoinWithoutExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table1 = env.fromElements((1, "hello")).as('a, 'b)
- val table2 = env.fromElements((1, "hello")).as('c, 'd)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
- val result = table1.join(table2).where("b = d").select("a, c").explain()
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+ val table = table1.join(table2).where("b = d").select("a, c")
+
+ val result = tEnv.explain(table)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin0.out").mkString
assertEquals(result, source)
@@ -73,10 +79,13 @@ class SqlExplainTest
@Test
def testJoinWithExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table1 = env.fromElements((1, "hello")).as('a, 'b)
- val table2 = env.fromElements((1, "hello")).as('c, 'd)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+ val table = table1.join(table2).where("b = d").select("a, c")
- val result = table1.join(table2).where("b = d").select("a, c").explain(true)
+ val result = tEnv.explain(table, true)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin1.out").mkString
assertEquals(result, source)
@@ -85,10 +94,13 @@ class SqlExplainTest
@Test
def testUnionWithoutExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table1 = env.fromElements((1, "hello")).as('count, 'word)
- val table2 = env.fromElements((1, "hello")).as('count, 'word)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
- val result = table1.unionAll(table2).explain()
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = tEnv.explain(table)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testUnion0.out").mkString
assertEquals(result, source)
@@ -97,10 +109,13 @@ class SqlExplainTest
@Test
def testUnionWithExtended() : Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val table1 = env.fromElements((1, "hello")).as('count, 'word)
- val table2 = env.fromElements((1, "hello")).as('count, 'word)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
- val result = table1.unionAll(table2).explain(true)
+ val result = tEnv.explain(table, true)
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testUnion1.out").mkString
assertEquals(result, source)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 310d133..1ad57b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.codegen.CodeGenException
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
@@ -36,7 +36,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
@Test
def testSubstring(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).toTable(tEnv, 'a, 'b)
.select('a.substring(1, 'b))
val expected = "AA\nB"
@@ -47,7 +49,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
@Test
def testSubstringWithMaxEnd(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).toTable(tEnv, 'a, 'b)
.select('a.substring('b))
val expected = "CD\nBCD"
@@ -57,9 +61,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
@Test(expected = classOf[CodeGenException])
def testNonWorkingSubstring1(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).toTable(tEnv, 'a, 'b)
// must fail, second argument of substring must be Integer not Double.
.select('a.substring(0, 'b))
@@ -68,9 +73,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
@Test(expected = classOf[CodeGenException])
def testNonWorkingSubstring2(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).toTable(tEnv, 'a, 'b)
// must fail, first argument of substring must be Integer not String.
.select('a.substring('b, 15))
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..bd1ce46
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSimpleRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds)
+ val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRegisterWithFields(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+ val t = tEnv.scan(tableName).select('a, 'b)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+ "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterExistingDataSet(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds1)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ // Must fail. Name is already in use.
+ tEnv.registerDataSet("MyTable", ds2)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testScanUnregisteredTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ // Must fail. No table registered under that name.
+ tEnv.scan("someTable")
+ }
+
+ @Test
+ def testTableRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable(tableName, t)
+
+ val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+ val expected = "9,4\n" + "10,4\n" +
+ "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+ "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = regT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterExistingTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", t1)
+ val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+ // Must fail. Name is already in use.
+ tEnv.registerDataSet("MyTable", t2)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterTableFromOtherEnv(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+ val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
+ // Must fail. Table is bound to different TableEnvironment.
+ tEnv2.registerTable("MyTable", t1)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
new file mode 100644
index 0000000..f162846
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableEnvironment, Row}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ToTableITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testToTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "Peter,28,4000.0,Sales\n" +
+ "Anna,56,10000.0,Engineering\n" +
+ "Lucy,42,6000.0,HR\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromAndToCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+ "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+ "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+ val results = t.toDataSet[SomeCaseClass].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToTableWithToFewFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToTableWithToManyFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToTableWithAmbiguousFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Field names not unique.
+ .toTable(tEnv, 'a, 'b, 'b)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToTableWithNonFieldReference1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a + 1, 'b, 'c)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToTableWithNonFieldReference2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a as 'foo, 'b, 'c)
+ }
+
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+ def this() { this("", 0, 0.0, "") }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
index a155935..0448386 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
@@ -40,8 +40,10 @@ class UnionITCase(
@Test
def testUnion(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val unionDs = ds1.unionAll(ds2).select('c)
@@ -53,9 +55,11 @@ class UnionITCase(
@Test
def testTernaryUnion(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
@@ -69,8 +73,14 @@ class UnionITCase(
@Test
def testUnionWithFilter(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ if (tEnv.getConfig.getEfficientTypeUsage) {
+ return
+ }
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
@@ -82,8 +92,10 @@ class UnionITCase(
@Test(expected = classOf[IllegalArgumentException])
def testUnionDifferentFieldNames(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
// must fail. Union inputs have different field names.
ds1.unionAll(ds2)
@@ -92,8 +104,11 @@ class UnionITCase(
@Test(expected = classOf[IllegalArgumentException])
def testUnionDifferentFieldTypes(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .select('a, 'b, 'c)
// must fail. Union inputs have different field types.
ds1.unionAll(ds2)
@@ -102,8 +117,14 @@ class UnionITCase(
@Test
def testUnionWithAggregation(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ if (tEnv.getConfig.getEfficientTypeUsage) {
+ return
+ }
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
@@ -115,9 +136,15 @@ class UnionITCase(
@Test
def testUnionWithJoin(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
- val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).as('a2, 'b2, 'd2, 'c2, 'e2)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ if (tEnv.getConfig.getEfficientTypeUsage) {
+ return
+ }
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv,'a, 'b, 'd, 'c, 'e)
+ val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).toTable(tEnv, 'a2, 'b2, 'd2, 'c2, 'e2)
val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c))
.join(ds3.select('a2, 'b2, 'c2))
@@ -130,4 +157,17 @@ class UnionITCase(
"Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test(expected = classOf[TableException])
+ def testUnionTablesFromDifferentEnvs(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ ds1.unionAll(ds2).select('c)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
index 48dea56..4e1ae02 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -25,12 +25,11 @@ import org.apache.calcite.tools.{Frameworks, RelBuilder}
import org.apache.flink.api.common.functions.{Function, MapFunction}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.table.{TableEnvironment, TableConfig}
import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.TranslationContext
-import org.apache.flink.api.table.plan.schema.DataSetTable
import org.apache.flink.api.table.runtime.FunctionCompiler
import org.mockito.Mockito._
@@ -45,20 +44,26 @@ object ExpressionEvaluator {
compile(getClass.getClassLoader, genFunc.name, genFunc.code)
}
- private def prepareTable(typeInfo: TypeInformation[Any]): (String, RelBuilder) = {
+ private def prepareTable(
+ typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = {
+
// create DataSetTable
val dataSetMock = mock(classOf[DataSet[Any]])
- when(dataSetMock.getType).thenReturn(typeInfo)
- val tableName = TranslationContext.registerDataSetTable(new DataSetTable[Any](
- dataSetMock,
- (0 until typeInfo.getArity).toArray,
- (0 until typeInfo.getArity).map("f" + _).toArray))
+ val jDataSetMock = mock(classOf[JDataSet[Any]])
+ when(dataSetMock.javaSet).thenReturn(jDataSetMock)
+ when(jDataSetMock.getType).thenReturn(typeInfo)
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val tableName = "myTable"
+ tEnv.registerDataSet(tableName, dataSetMock)
// prepare RelBuilder
- val relBuilder = TranslationContext.getRelBuilder
+ val relBuilder = tEnv.getRelBuilder
relBuilder.scan(tableName)
- (tableName, relBuilder)
+ (tableName, relBuilder, tEnv)
}
def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = {
@@ -66,7 +71,7 @@ object ExpressionEvaluator {
val table = prepareTable(typeInfo)
// create RelNode from SQL expression
- val planner = Frameworks.getPlanner(TranslationContext.getFrameworkConfig)
+ val planner = Frameworks.getPlanner(table._3.getFrameworkConfig)
val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
val validated = planner.validate(parsed)
val converted = planner.rel(validated)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
index a971136..723646b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
@@ -20,11 +20,11 @@ package org.apache.flink.api.table.test.utils
import java.util
-import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
+import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaTableEnv}
import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
+import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaTableEnv}
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{TableEnvironment, TableConfig}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL}
import org.apache.flink.test.util.MultipleProgramsTestBase
@@ -38,34 +38,16 @@ class TableProgramsTestBase(
tableConfigMode: TableConfigMode)
extends MultipleProgramsTestBase(mode) {
- def getJavaTableEnvironment: JavaTableEnv = {
- val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
- val tableEnv = new JavaTableEnv
- configure(tableEnv.getConfig)
- tableEnv
- }
-
- def getScalaTableEnvironment: ScalaTableEnv = {
- val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
- val tableEnv = new ScalaTableEnv
- configure(tableEnv.getConfig)
- tableEnv
- }
-
- def getConfig: TableConfig = {
- val config = new TableConfig()
- configure(config)
- config
- }
-
- def configure(config: TableConfig): Unit = {
+ def config: TableConfig = {
+ val conf = new TableConfig
tableConfigMode match {
case NULL =>
- config.setNullCheck(true)
+ conf.setNullCheck(true)
case EFFICIENT =>
- config.setEfficientTypeUsage(true)
+ conf.setEfficientTypeUsage(true)
case _ => // keep default
}
+ conf
}
}