You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 19:39:52 UTC
[07/12] flink git commit: [FLINK-3738] [table] Refactor
TableEnvironments. Remove Translators and TranslationContext.
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
new file mode 100644
index 0000000..c596014
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TableEnvironmentITCase extends TableProgramsTestBase {
+
+ public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode configMode) {
+ super(mode, configMode);
+ }
+
+ @Test
+ public void testSimpleRegister() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet(tableName, ds);
+ Table t = tableEnv.scan(tableName);
+
+ Table result = t.select("f0, f1");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testRegisterWithFields() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet(tableName, ds, "a, b, c");
+ Table t = tableEnv.scan(tableName);
+
+ Table result = t.select("a, b, c");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testRegisterExistingDatasetTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("MyTable", ds);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+ CollectionDataSets.getSmall5TupleDataSet(env);
+ // Must fail. Name is already used for different table.
+ tableEnv.registerDataSet("MyTable", ds2);
+ }
+
+ @Test(expected = TableException.class)
+ public void testScanUnregisteredTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. No table registered under that name.
+ tableEnv.scan("nonRegisteredTable");
+ }
+
+ @Test
+ public void testTableRegister() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table t = tableEnv.fromDataSet(ds);
+ tableEnv.registerTable(tableName, t);
+ Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+ "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testIllegalName() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table t = tableEnv.fromDataSet(ds);
+ // Must fail. Table name matches internal name pattern.
+ tableEnv.registerTable("_DataSetTable_42", t);
+ }
+
+ @Test(expected = TableException.class)
+ public void testRegisterTableFromOtherEnv() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+ // Must fail. Table is bound to different TableEnvironment.
+ tableEnv2.registerTable("MyTable", t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index 167e45c..b69fec4 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -20,11 +20,13 @@ package org.apache.flink.api.java.table.test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
@@ -43,7 +45,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test
public void testUnion() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
@@ -62,7 +64,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test
public void testUnionWithFilter() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -81,7 +83,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test(expected = IllegalArgumentException.class)
public void testUnionIncompatibleNumberOfFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -96,7 +98,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test(expected = IllegalArgumentException.class)
public void testUnionIncompatibleFieldsName() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
@@ -111,7 +113,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test(expected = IllegalArgumentException.class)
public void testUnionIncompatibleFieldTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -126,7 +128,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test
public void testUnionWithAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -145,7 +147,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
@Test
public void testUnionWithJoin() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- TableEnvironment tableEnv = new TableEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
@@ -164,5 +166,21 @@ public class UnionITCase extends MultipleProgramsTestBase {
"Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
"Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
compareResultAsText(results, expected);
- }
+ }
+
+ @Test(expected = TableException.class)
+ public void testUnionTablesFromDifferentEnvs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+ Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ in1.unionAll(in2);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
index d577564..08f44d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class AggregationsITCase(
def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
@@ -54,7 +52,7 @@ class AggregationsITCase(
val result = tEnv.sql(sqlQuery)
val expected = "231,1,21,21,11"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -62,18 +60,17 @@ class AggregationsITCase(
def testTableAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1) FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "231"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -81,8 +78,7 @@ class AggregationsITCase(
def testDataSetAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1) FROM MyTable"
@@ -92,7 +88,7 @@ class AggregationsITCase(
val result = tEnv.sql(sqlQuery)
val expected = "231"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -100,8 +96,7 @@ class AggregationsITCase(
def testWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7)" +
"FROM MyTable"
@@ -114,7 +109,7 @@ class AggregationsITCase(
val result = tEnv.sql(sqlQuery)
val expected = "1,1,1,1,1.5,1.5,2"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -122,21 +117,20 @@ class AggregationsITCase(
def testTableWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
"FROM MyTable"
val ds = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).as('a, 'b, 'c, 'd, 'e, 'f, 'g)
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "1,1,1,1,1.5,1.5,2"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -144,19 +138,18 @@ class AggregationsITCase(
def testTableProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
"FROM MyTable"
- val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).as('a, 'b)
+ val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "1,3,2,1,3"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -164,19 +157,18 @@ class AggregationsITCase(
def testTableAggregationWithArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
"FROM MyTable"
- val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).as('a, 'b)
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "5.5,7"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -184,18 +176,17 @@ class AggregationsITCase(
def testAggregationWithTwoCount(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
- val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "2,2"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -204,21 +195,20 @@ class AggregationsITCase(
def testAggregationAfterProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
"(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
val ds = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "1,3,2"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
index 171e200..e1edd87 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,18 +42,17 @@ class FilterITCase(
def testAllRejectingFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE false"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -62,12 +60,11 @@ class FilterITCase(
def testAllPassingFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE true"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -78,7 +75,7 @@ class FilterITCase(
"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -86,18 +83,17 @@ class FilterITCase(
def testFilterOnString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -105,12 +101,11 @@ class FilterITCase(
def testFilterOnInteger(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -119,7 +114,7 @@ class FilterITCase(
"6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -127,18 +122,17 @@ class FilterITCase(
def testDisjunctivePredicate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -146,12 +140,11 @@ class FilterITCase(
def testFilterWithAnd(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -159,7 +152,7 @@ class FilterITCase(
val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
"9,4,Comment#3\n" + "17,6,Comment#11\n" +
"19,6,Comment#13\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
index b09aa75..561f2a7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -22,8 +22,8 @@ import org.apache.calcite.tools.ValidationException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.{PlanGenException, TranslationContext}
+import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
+import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -44,20 +44,19 @@ class JoinITCase(
def testJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
val result = tEnv.sql(sqlQuery)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -65,20 +64,19 @@ class JoinITCase(
def testJoinWithFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
val result = tEnv.sql(sqlQuery)
val expected = "Hi,Hallo\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -86,13 +84,12 @@ class JoinITCase(
def testJoinWithJoinFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
@@ -100,7 +97,7 @@ class JoinITCase(
val expected = "Hello world, how are you?,Hallo Welt wie\n" +
"I am fine.,Hallo Welt wie\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -108,13 +105,12 @@ class JoinITCase(
def testJoinWithMultipleKeys(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
@@ -122,7 +118,7 @@ class JoinITCase(
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -130,13 +126,12 @@ class JoinITCase(
def testJoinNonExistingKey(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
@@ -147,13 +142,12 @@ class JoinITCase(
def testJoinNonMatchingKeyTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
@@ -164,13 +158,12 @@ class JoinITCase(
def testJoinWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
@@ -181,20 +174,19 @@ class JoinITCase(
def testJoinWithAlias(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
val result = tEnv.sql(sqlQuery)
val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
"2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -202,25 +194,23 @@ class JoinITCase(
def testJoinNoEqualityPredicate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
}
@Test
def testDataSetJoinWithAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
@@ -232,7 +222,7 @@ class JoinITCase(
val result = tEnv.sql(sqlQuery)
val expected = "6,6"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -240,20 +230,19 @@ class JoinITCase(
def testTableJoinWithAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
val result = tEnv.sql(sqlQuery)
val expected = "6,6"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -261,50 +250,47 @@ class JoinITCase(
def testFullOuterJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
}
@Test(expected = classOf[PlanGenException])
def testLeftOuterJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
}
@Test(expected = classOf[PlanGenException])
def testRightOuterJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
index f08c95c..b1d423b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
@@ -22,8 +22,7 @@ import org.apache.calcite.tools.ValidationException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -44,12 +43,11 @@ class SelectITCase(
def testSelectStarFromTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -61,7 +59,7 @@ class SelectITCase(
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -69,8 +67,7 @@ class SelectITCase(
def testSelectStarFromDataSet(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable"
@@ -86,7 +83,7 @@ class SelectITCase(
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -94,12 +91,11 @@ class SelectITCase(
def testSimpleSelectAll(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT a, b, c FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -111,7 +107,7 @@ class SelectITCase(
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -119,12 +115,11 @@ class SelectITCase(
def testSelectWithNaming(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
val result = tEnv.sql(sqlQuery)
@@ -133,7 +128,7 @@ class SelectITCase(
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -141,12 +136,11 @@ class SelectITCase(
def testInvalidFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT a, foo FROM MyTable"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
tEnv.sql(sqlQuery)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
index 153a9d0..71334f4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class TableWithSQLITCase(
def testSQLTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
@@ -54,7 +52,7 @@ class TableWithSQLITCase(
val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
val expected = "15,65,12"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -62,10 +60,9 @@ class TableWithSQLITCase(
def testTableSQLTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val t1 = ds.filter('a > 9)
tEnv.registerTable("MyTable", t1)
@@ -75,7 +72,7 @@ class TableWithSQLITCase(
val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
val expected = "16,60,12"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -83,10 +80,9 @@ class TableWithSQLITCase(
def testMultipleSQLQueries(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
@@ -97,7 +93,7 @@ class TableWithSQLITCase(
val result2 = tEnv.sql(sqlQuery2)
val expected = "6"
- val results = result2.toDataSet[Row](getConfig).collect()
+ val results = result2.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
index 4a031a3..fadc48f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.sql.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -43,8 +42,7 @@ class UnionITCase(
def testUnion(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
@@ -56,7 +54,7 @@ class UnionITCase(
val result = tEnv.sql(sqlQuery)
val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
- val results = result.toDataSet[Row](getConfig).collect()
+ val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -65,8 +63,11 @@ class UnionITCase(
def testUnionWithFilter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ if (tEnv.getConfig.getEfficientTypeUsage) {
+ return
+ }
val sqlQuery = "SELECT c FROM (" +
"SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
@@ -89,8 +90,11 @@ class UnionITCase(
def testUnionWithAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = getScalaTableEnvironment
- TranslationContext.reset()
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ if (tEnv.getConfig.getEfficientTypeUsage) {
+ return
+ }
val sqlQuery = "SELECT count(c) FROM (" +
"SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
@@ -106,4 +110,5 @@ class UnionITCase(
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
index fbfb39e..4760d37 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.streaming.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
@@ -40,8 +40,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
* Test simple filter
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter('a === 3)
val results = filterDs.toDataStream[Row]
@@ -58,8 +60,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
* Test all-rejecting filter
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(false) )
val results = filterDs.toDataStream[Row]
@@ -75,8 +79,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
* Test all-passing filter
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(true) )
val results = filterDs.toDataStream[Row]
@@ -96,8 +102,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
* Test filter on Integer tuple field.
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 === 0 )
val results = filterDs.toDataStream[Row]
@@ -118,8 +126,10 @@ class FilterITCase extends StreamingMultipleProgramsTestBase {
* Test filter on Integer tuple field.
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 !== 0)
val results = filterDs.toDataStream[Row]
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
index 83cb7fd..75a9c97 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
@@ -20,12 +20,10 @@ package org.apache.flink.api.scala.table.streaming.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.JavaConversions._
import org.junit.Test
import org.junit.Assert._
import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
@@ -37,8 +35,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testSimpleSelectAll(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toStreamTable.select('_1, '_2, '_3)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -55,8 +54,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testSelectFirst(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toStreamTable.select('_1)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -71,8 +71,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
// verify ProjectMergeRule.
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toStreamTable
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
.select('_1 as 'a, '_2 as 'b, '_1 as 'c)
.select('a, 'b)
@@ -91,8 +92,10 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testSimpleSelectAllWithAs(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c).select('a, 'b, 'c)
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -109,8 +112,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testAsWithToFewFields(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -124,8 +128,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testAsWithToManyFields(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c, 'd)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -139,8 +144,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testAsWithAmbiguousFields(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'b)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
@@ -155,8 +161,9 @@ class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
def testOnlyFieldRefInAs(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b as 'c, 'd)
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
val results = ds.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
index ed5e9b5..ae81f3b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table.streaming.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.collection.JavaConverters._
@@ -36,9 +36,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
- val ds2 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val unionDs = ds1.unionAll(ds2).select('c)
@@ -54,9 +56,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUnionWithFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'd, 'c, 'e)
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
@@ -71,9 +75,11 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
@Test(expected = classOf[IllegalArgumentException])
def testUnionFieldsNameNotOverlap1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'd, 'c, 'e)
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
val unionDs = ds1.unionAll(ds2)
@@ -87,9 +93,12 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
@Test(expected = classOf[IllegalArgumentException])
def testUnionFieldsNameNotOverlap2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c)
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .select('a, 'b, 'c)
val unionDs = ds1.unionAll(ds2)
@@ -101,4 +110,17 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
assertEquals(true, StreamITCase.testResults.isEmpty)
}
+ @Test(expected = classOf[TableException])
+ def testUnionTablesFromDifferentEnvs(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+ val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ ds1.unionAll(ds2)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index abf2735..26cdc76 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -19,7 +19,7 @@
package org.apache.flink.api.scala.table.test
import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
@@ -38,7 +38,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
val results = t.toDataSet[Row].collect()
@@ -50,7 +52,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationOnNonExistingField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
// Must fail. Field 'foo does not exist.
.select('foo.avg)
}
@@ -59,9 +63,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
val expected = "1,1,1,1,1.5,1.5,2"
@@ -73,9 +79,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(
(1: Byte, 1: Short),
- (2: Byte, 2: Short)).toTable
+ (2: Byte, 2: Short)).toTable(tEnv)
.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
val expected = "1,3,2,1,3"
@@ -87,7 +95,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationWithArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
.select(('_1 + 2).avg + 2, '_2.count + 5)
val expected = "5.5,7"
@@ -99,7 +109,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationWithTwoCount(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
.select('_1.count, '_2.count)
val expected = "2,2"
@@ -111,9 +123,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testAggregationAfterProjection(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val t = env.fromElements(
(1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
.select('_1, '_2, '_3)
.select('_1.avg, '_2.sum, '_3.count)
@@ -126,7 +140,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testNonWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("Hello", 1)).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("Hello", 1)).toTable(tEnv)
// Must fail. Field '_1 is not a numeric type.
.select('_1.sum)
@@ -137,7 +153,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testNoNestedAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("Hello", 1)).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = env.fromElements(("Hello", 1)).toTable(tEnv)
// Must fail. Sum aggregation can not be chained.
.select('_2.sum.sum)
}
@@ -146,7 +164,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testSQLStyleAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
.select(
"""Sum( a) as a1, a.sum as a2,
|Min (a) as b1, a.min as b2,
@@ -164,13 +184,15 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
def testPojoAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
val input = env.fromElements(
MyWC("hello", 1),
MyWC("hello", 1),
MyWC("ciao", 1),
MyWC("hola", 1),
MyWC("hola", 1))
- val expr = input.toTable
+ val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
.select('word, 'count.sum as 'count)
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
deleted file mode 100644
index c2e4e96..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AsITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testAs(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env)
- .as('a, 'b, 'c)
- .select('a, 'b, 'c)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = t.toDataSet[Row](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAsFromCaseClass(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val data = List(
- SomeCaseClass("Peter", 28, 4000.00, "Sales"),
- SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
- SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
- val t = env.fromCollection(data)
- .as('a, 'b, 'c, 'd)
- .select('a, 'b, 'c, 'd)
-
- val expected: String =
- "Peter,28,4000.0,Sales\n" +
- "Anna,56,10000.0,Engineering\n" +
- "Lucy,42,6000.0,HR\n"
- val results = t.toDataSet[Row](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAsFromAndToCaseClass(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val data = List(
- SomeCaseClass("Peter", 28, 4000.00, "Sales"),
- SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
- SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
- val t = env.fromCollection(data)
- .as('a, 'b, 'c, 'd)
- .select('a, 'b, 'c, 'd)
-
- val expected: String =
- "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
- "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
- "SomeCaseClass(Lucy,42,6000.0,HR)\n"
- val results = t.toDataSet[SomeCaseClass](getConfig).collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testAsWithToFewFields(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Number of fields does not match.
- .as('a, 'b)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testAsWithToManyFields(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Number of fields does not match.
- .as('a, 'b, 'c, 'd)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testAsWithAmbiguousFields(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Field names not unique.
- .as('a, 'b, 'b)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testAsWithNonFieldReference1(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- // Must fail. as() can only have field references
- CollectionDataSets.get3TupleDataSet(env)
- .as('a + 1, 'b, 'c)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testAsWithNonFieldReference2(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- // Must fail. as() can only have field references
- CollectionDataSets.get3TupleDataSet(env)
- .as('a as 'foo, 'b, 'c)
- }
-
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
- def this() { this("", 0, 0.0, "") }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1acd8445/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
index c5d31da..af5e6d7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -39,24 +39,26 @@ class CalcITCase(
@Test
def testSimpleCalc(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1, '_2, '_3)
.where('_1 < 7)
.select('_1, '_3)
val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
"4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testCalcWithTwoFilters(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1, '_2, '_3)
.where('_1 < 7 && '_2 === 3)
.select('_1, '_3)
@@ -64,15 +66,16 @@ class CalcITCase(
.select('_1)
val expected = "4\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testCalcWithAggregation(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = CollectionDataSets.get3TupleDataSet(env).toTable
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1, '_2, '_3)
.where('_1 < 15)
.groupBy('_2)
@@ -80,21 +83,23 @@ class CalcITCase(
.where('cnt > 3)
val expected = "7,4\n" + "11,4\n"
- val results = t.toDataSet[Row](getConfig).collect()
+ val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testCalcJoin(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
.where('b > 1).select('a, 'd).where('d === 2)
val expected = "2,2\n" + "3,2\n"
- val results = joinT.toDataSet[Row](getConfig).collect()
+ val results = joinT.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}