You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:36 UTC
[07/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
deleted file mode 100644
index 21fe157..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.datastream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.TableFunc0
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testCrossJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
- val func0 = new TableFunc0
-
- val result = t
- .join(func0('c) as('d, 'e))
- .select('c, 'd, 'e)
- .toDataStream[Row]
-
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
- val func0 = new TableFunc0
-
- val result = t
- .leftOuterJoin(func0('c) as('d, 'e))
- .select('c, 'd, 'e)
- .toDataStream[Row]
-
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "nosharp,null,null", "Jack#22,Jack,22",
- "John#19,John,19", "Anna#44,Anna,44")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- private def testData(
- env: StreamExecutionEnvironment)
- : DataStream[(Int, Long, String)] = {
-
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Jack#22"))
- data.+=((2, 2L, "John#19"))
- data.+=((3, 2L, "Anna#44"))
- data.+=((4, 3L, "nosharp"))
- env.fromCollection(data)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
deleted file mode 100644
index b281dfc..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.utils
-
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.{Table, TableEnvironment}
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.junit.Assert.assertEquals
-import org.mockito.Mockito.{mock, when}
-
-/**
- * Test base for testing Table API / SQL plans.
- */
-class TableTestBase {
-
- def batchTestUtil(): BatchTableTestUtil = {
- BatchTableTestUtil()
- }
-
- def streamTestUtil(): StreamTableTestUtil = {
- StreamTableTestUtil()
- }
-
- def verifyTableEquals(expected: Table, actual: Table): Unit = {
- assertEquals(
- "Logical plans do not match",
- RelOptUtil.toString(expected.getRelNode),
- RelOptUtil.toString(actual.getRelNode))
- }
-
-}
-
-abstract class TableTestUtil {
-
- private var counter = 0
-
- def addTable[T: TypeInformation](fields: Expression*): Table = {
- addTable[T](s"Table${counter += 1}", fields: _*)
- }
-
- def addTable[T: TypeInformation](name: String, fields: Expression*): Table
- def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): TableFunction[T]
- def addFunction(name: String, function: ScalarFunction): Unit
-
- def verifySql(query: String, expected: String): Unit
- def verifyTable(resultTable: Table, expected: String): Unit
-
- // the print methods are for debugging purposes only
- def printTable(resultTable: Table): Unit
- def printSql(query: String): Unit
-}
-
-object TableTestUtil {
-
- // this methods are currently just for simplifying string construction,
- // we could replace it with logic later
-
- def unaryNode(node: String, input: String, term: String*): String = {
- s"""$node(${term.mkString(", ")})
- |$input
- |""".stripMargin.stripLineEnd
- }
-
- def binaryNode(node: String, left: String, right: String, term: String*): String = {
- s"""$node(${term.mkString(", ")})
- |$left
- |$right
- |""".stripMargin.stripLineEnd
- }
-
- def values(node: String, term: String*): String = {
- s"$node(${term.mkString(", ")})"
- }
-
- def term(term: AnyRef, value: AnyRef*): String = {
- s"$term=[${value.mkString(", ")}]"
- }
-
- def tuples(value:List[AnyRef]*): String={
- val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
- term("tuples","[" + listValues.mkString(", ") + "]")
- }
-
- def batchTableNode(idx: Int): String = {
- s"DataSetScan(table=[[_DataSetTable_$idx]])"
- }
-
- def streamTableNode(idx: Int): String = {
- s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
- }
-
-}
-
-case class BatchTableTestUtil() extends TableTestUtil {
-
- val env = mock(classOf[ExecutionEnvironment])
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- def addTable[T: TypeInformation](
- name: String,
- fields: Expression*)
- : Table = {
- val ds = mock(classOf[DataSet[T]])
- val jDs = mock(classOf[JDataSet[T]])
- when(ds.javaSet).thenReturn(jDs)
- val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
- when(jDs.getType).thenReturn(typeInfo)
-
- val t = ds.toTable(tEnv, fields: _*)
- tEnv.registerTable(name, t)
- t
- }
-
- def addFunction[T: TypeInformation](
- name: String,
- function: TableFunction[T])
- : TableFunction[T] = {
- tEnv.registerFunction(name, function)
- function
- }
-
- def addFunction(name: String, function: ScalarFunction): Unit = {
- tEnv.registerFunction(name, function)
- }
-
- def verifySql(query: String, expected: String): Unit = {
- verifyTable(tEnv.sql(query), expected)
- }
-
- def verifyTable(resultTable: Table, expected: String): Unit = {
- val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
- val actual = RelOptUtil.toString(optimized)
- assertEquals(
- expected.split("\n").map(_.trim).mkString("\n"),
- actual.split("\n").map(_.trim).mkString("\n"))
- }
-
- def printTable(resultTable: Table): Unit = {
- val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
- println(RelOptUtil.toString(optimized))
- }
-
- def printSql(query: String): Unit = {
- printTable(tEnv.sql(query))
- }
-}
-
-case class StreamTableTestUtil() extends TableTestUtil {
-
- val env = mock(classOf[StreamExecutionEnvironment])
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- def addTable[T: TypeInformation](
- name: String,
- fields: Expression*)
- : Table = {
-
- val ds = mock(classOf[DataStream[T]])
- val jDs = mock(classOf[JDataStream[T]])
- when(ds.javaStream).thenReturn(jDs)
- val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
- when(jDs.getType).thenReturn(typeInfo)
-
- val t = ds.toTable(tEnv, fields: _*)
- tEnv.registerTable(name, t)
- t
- }
-
- def addFunction[T: TypeInformation](
- name: String,
- function: TableFunction[T])
- : TableFunction[T] = {
- tEnv.registerFunction(name, function)
- function
- }
-
- def addFunction(name: String, function: ScalarFunction): Unit = {
- tEnv.registerFunction(name, function)
- }
-
- def verifySql(query: String, expected: String): Unit = {
- verifyTable(tEnv.sql(query), expected)
- }
-
- def verifyTable(resultTable: Table, expected: String): Unit = {
- val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
- val actual = RelOptUtil.toString(optimized)
- assertEquals(
- expected.split("\n").map(_.trim).mkString("\n"),
- actual.split("\n").map(_.trim).mkString("\n"))
- }
-
- // the print methods are for debugging purposes only
- def printTable(resultTable: Table): Unit = {
- val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
- println(RelOptUtil.toString(optimized))
- }
-
- def printSql(query: String): Unit = {
- printTable(tEnv.sql(query))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
deleted file mode 100644
index 4291b29..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.utils
-
-import java.lang.Boolean
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-
-
-case class SimpleUser(name: String, age: Int)
-
-class TableFunc0 extends TableFunction[SimpleUser] {
- // make sure input element's format is "<string>#<int>"
- def eval(user: String): Unit = {
- if (user.contains("#")) {
- val splits = user.split("#")
- collect(SimpleUser(splits(0), splits(1).toInt))
- }
- }
-}
-
-class TableFunc1 extends TableFunction[String] {
- def eval(str: String): Unit = {
- if (str.contains("#")){
- str.split("#").foreach(collect)
- }
- }
-
- def eval(str: String, prefix: String): Unit = {
- if (str.contains("#")) {
- str.split("#").foreach(s => collect(prefix + s))
- }
- }
-}
-
-
-class TableFunc2 extends TableFunction[Row] {
- def eval(str: String): Unit = {
- if (str.contains("#")) {
- str.split("#").foreach({ s =>
- val row = new Row(2)
- row.setField(0, s)
- row.setField(1, s.length)
- collect(row)
- })
- }
- }
-
- override def getResultType: TypeInformation[Row] = {
- new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO)
- }
-}
-
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
- def eval(user: String) {
- if (user.contains("#")) {
- val splits = user.split("#")
- val age = splits(1).toInt
- collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
- }
- }
-}
-
-abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]] {}
-
-class PojoTableFunc extends TableFunction[PojoUser] {
- def eval(user: String) {
- if (user.contains("#")) {
- val splits = user.split("#")
- collect(new PojoUser(splits(0), splits(1).toInt))
- }
- }
-}
-
-class PojoUser() {
- var name: String = _
- var age: Int = 0
-
- def this(name: String, age: Int) {
- this()
- this.name = name
- this.age = age
- }
-}
-
-// ----------------------------------------------------------------------------------------------
-// Invalid Table Functions
-// ----------------------------------------------------------------------------------------------
-
-
-// this is used to check whether scala object is forbidden
-object ObjectTableFunction extends TableFunction[Integer] {
- def eval(a: Int, b: Int): Unit = {
- collect(a)
- collect(b)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
new file mode 100644
index 0000000..708e007
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Test for testing aggregate plans.
+ */
+class AggregationTest extends TableTestBase {
+
+ @Test
+ def testAggregateQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ util.verifySql(sqlQuery, aggregate)
+ }
+
+ @Test
+ def testAggregateWithFilterQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ util.verifySql(sqlQuery, aggregate)
+ }
+
+ @Test
+ def testAggregateGroupQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ val expected = unaryNode(
+ "DataSetCalc",
+ aggregate,
+ term("select",
+ "EXPR$0",
+ "EXPR$1",
+ "EXPR$2")
+ )
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select","a", "b", "c") ,
+ term("where","=(a, 1)")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ calcNode,
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ val expected = unaryNode(
+ "DataSetCalc",
+ aggregate,
+ term("select",
+ "EXPR$0",
+ "EXPR$1",
+ "EXPR$2")
+ )
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testAggregateGroupWithFilterTableApi(): Unit = {
+
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.groupBy('a)
+ .select('a, 'a.avg, 'b.sum, 'c.count)
+ .where('a === 1)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ calcNode,
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable,expected)
+ }
+
+ @Test
+ def testAggregateTableApi(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+ val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testAggregateWithFilterTableApi(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+ .select('a.avg,'b.sum,'c.count)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
new file mode 100644
index 0000000..09a4c4e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule}
+import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable}
+import org.apache.calcite.tools.RuleSets
+import org.apache.flink.table.calcite.{CalciteConfigBuilder, CalciteConfig}
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class CalciteConfigBuilderTest {
+
+ @Test
+ def testDefaultRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .build()
+
+ assertEquals(false, cc.replacesRuleSet)
+ assertFalse(cc.getRuleSet.isDefined)
+ }
+
+ @Test
+ def testReplaceRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesRuleSet)
+ assertTrue(cc.getRuleSet.isDefined)
+ val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+ }
+
+ @Test
+ def testReplaceAddRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesRuleSet)
+ assertTrue(cc.getRuleSet.isDefined)
+ val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+ assertEquals(3, cSet.size)
+ assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+ assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+ assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+ }
+
+ @Test
+ def testAddRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesRuleSet)
+ assertTrue(cc.getRuleSet.isDefined)
+ val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+ }
+
+ @Test
+ def testAddAddRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesRuleSet)
+ assertTrue(cc.getRuleSet.isDefined)
+ val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+ assertEquals(3, cSet.size)
+ assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+ assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+ assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+ }
+
+ @Test
+ def testDefaultOperatorTable(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .build()
+
+ assertEquals(false, cc.replacesSqlOperatorTable)
+ assertFalse(cc.getSqlOperatorTable.isDefined)
+ }
+
+ def testReplaceOperatorTable(): Unit = {
+
+ val oracleTable = new OracleSqlOperatorTable
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceSqlOperatorTable(oracleTable)
+ .build()
+
+ val oracleOps = oracleTable.getOperatorList.asScala
+
+ assertEquals(true, cc.replacesSqlOperatorTable)
+ assertTrue(cc.getSqlOperatorTable.isDefined)
+ val ops = cc.getSqlOperatorTable.get.getOperatorList
+ .asScala.toSet
+ assertEquals(oracleOps.size, ops.size)
+ for (o <- oracleOps) {
+ assertTrue(ops.contains(o))
+ }
+ }
+
+ def testReplaceAddOperatorTable(): Unit = {
+
+ val oracleTable = new OracleSqlOperatorTable
+ val stdTable = new SqlStdOperatorTable
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceSqlOperatorTable(oracleTable)
+ .addSqlOperatorTable(stdTable)
+ .build()
+
+ val oracleOps = oracleTable.getOperatorList.asScala
+ val stdOps = stdTable.getOperatorList.asScala
+
+ assertEquals(true, cc.replacesSqlOperatorTable)
+ assertTrue(cc.getSqlOperatorTable.isDefined)
+ val ops = cc.getSqlOperatorTable.get.getOperatorList
+ .asScala.toSet
+ assertEquals(oracleOps.size + stdOps.size, ops.size)
+ for (o <- oracleOps) {
+ assertTrue(ops.contains(o))
+ }
+ for (o <- stdOps) {
+ assertTrue(ops.contains(o))
+ }
+
+ }
+
+ def testAddOperatorTable(): Unit = {
+
+ val oracleTable = new OracleSqlOperatorTable
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addSqlOperatorTable(oracleTable)
+ .build()
+
+ val oracleOps = oracleTable.getOperatorList.asScala
+
+ assertEquals(false, cc.replacesSqlOperatorTable)
+ assertTrue(cc.getSqlOperatorTable.isDefined)
+ val ops = cc.getSqlOperatorTable.get.getOperatorList
+ .asScala.toSet
+ assertEquals(oracleOps.size, ops.size)
+ for (o <- oracleOps) {
+ assertTrue(ops.contains(o))
+ }
+ }
+
+ def testAddAddOperatorTable(): Unit = {
+
+ val oracleTable = new OracleSqlOperatorTable
+ val stdTable = new SqlStdOperatorTable
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addSqlOperatorTable(oracleTable)
+ .addSqlOperatorTable(stdTable)
+ .build()
+
+ val oracleOps = oracleTable.getOperatorList.asScala
+ val stdOps = stdTable.getOperatorList.asScala
+
+ assertEquals(false, cc.replacesSqlOperatorTable)
+ assertTrue(cc.getSqlOperatorTable.isDefined)
+ val ops = cc.getSqlOperatorTable.get.getOperatorList
+ .asScala.toSet
+ assertEquals(oracleOps.size + stdOps.size, ops.size)
+ for (o <- oracleOps) {
+ assertTrue(ops.contains(o))
+ }
+ for (o <- stdOps) {
+ assertTrue(ops.contains(o))
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
new file mode 100644
index 0000000..0055fc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+
+class CompositeFlatteningTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testDuplicateFlattening(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ table.select('a.flatten(), 'a.flatten())
+ }
+
+ @Test
+ def testMultipleFlatteningsTable(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "c",
+ "b._1 AS b$_1",
+ "b._2 AS b$_2"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testMultipleFlatteningsSql(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS _1",
+ "a._2 AS _2",
+ "c",
+ "b._1 AS _10",
+ "b._2 AS _20"
+ )
+ )
+
+ util.verifySql(
+ "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
+ expected)
+ }
+
+ @Test
+ def testNestedFlattenings(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+ val result = table.select('a.flatten(), 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "b"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testScalarFunctionAccess(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+ val result = table.select(
+ giveMeCaseClass().get("my"),
+ giveMeCaseClass().get("clazz"),
+ giveMeCaseClass().flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
+ "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
+ "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
+ "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
+
+object CompositeFlatteningTest {
+
+ case class TestCaseClass(my: String, clazz: Int)
+
+ object giveMeCaseClass extends ScalarFunction {
+ def eval(): TestCaseClass = {
+ TestCaseClass("hello", 42)
+ }
+
+ override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+ createTypeInformation[TestCaseClass]
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..b660243
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ExpressionReductionTest extends TableTestBase {
+
+ @Test
+ def testReduceCalcExpressionForBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "+(7, a) AS EXPR$0",
+ "+(b, 3) AS EXPR$1",
+ "'b' AS EXPR$2",
+ "'STRING' AS EXPR$3",
+ "'teststring' AS EXPR$4",
+ "null AS EXPR$5",
+ "1990-10-24 23:00:01 AS EXPR$6",
+ "19 AS EXPR$7",
+ "false AS EXPR$8",
+ "true AS EXPR$9",
+ "2 AS EXPR$10",
+ "true AS EXPR$11",
+ "'trueX' AS EXPR$12"
+ ),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceProjectExpressionForBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "+(7, a) AS EXPR$0",
+ "+(b, 3) AS EXPR$1",
+ "'b' AS EXPR$2",
+ "'STRING' AS EXPR$3",
+ "'teststring' AS EXPR$4",
+ "null AS EXPR$5",
+ "1990-10-24 23:00:01 AS EXPR$6",
+ "19 AS EXPR$7",
+ "false AS EXPR$8",
+ "true AS EXPR$9",
+ "2 AS EXPR$10",
+ "true AS EXPR$11",
+ "'trueX' AS EXPR$12"
+ )
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceFilterExpressionForBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "*" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ ),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceCalcExpressionForStreamSQL(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "+(7, a) AS EXPR$0",
+ "+(b, 3) AS EXPR$1",
+ "'b' AS EXPR$2",
+ "'STRING' AS EXPR$3",
+ "'teststring' AS EXPR$4",
+ "null AS EXPR$5",
+ "1990-10-24 23:00:01 AS EXPR$6",
+ "19 AS EXPR$7",
+ "false AS EXPR$8",
+ "true AS EXPR$9",
+ "2 AS EXPR$10",
+ "true AS EXPR$11",
+ "'trueX' AS EXPR$12"
+ ),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceProjectExpressionForStreamSQL(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "+(7, a) AS EXPR$0",
+ "+(b, 3) AS EXPR$1",
+ "'b' AS EXPR$2",
+ "'STRING' AS EXPR$3",
+ "'teststring' AS EXPR$4",
+ "null AS EXPR$5",
+ "1990-10-24 23:00:01 AS EXPR$6",
+ "19 AS EXPR$7",
+ "false AS EXPR$8",
+ "true AS EXPR$9",
+ "2 AS EXPR$10",
+ "true AS EXPR$11",
+ "'trueX' AS EXPR$12"
+ )
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceFilterExpressionForStreamSQL(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT " +
+ "*" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ ),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
new file mode 100644
index 0000000..b90de97
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.sinks.TableSink
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class TableEnvironmentTest {
+
+ val tEnv = new MockTableEnvironment
+
+ val tupleType = new TupleTypeInfo(
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ DOUBLE_TYPE_INFO)
+
+ val caseClassType = implicitly[TypeInformation[CClass]]
+
+ val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
+
+ val atomicType = INT_TYPE_INFO
+
+ @Test
+ def testGetFieldInfoTuple(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(tupleType)
+
+ fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClass(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(caseClassType)
+
+ fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoPojo(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(pojoType)
+
+ fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomic(): Unit = {
+ tEnv.getFieldInfo(atomicType)
+ }
+
+ @Test
+ def testGetFieldInfoTupleNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClassNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoPojoNames1(): Unit = {
+ tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoPojoNames2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new UnresolvedFieldReference("pf3"),
+ new UnresolvedFieldReference("pf1"),
+ new UnresolvedFieldReference("pf2")
+ ))
+
+ fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoAtomicName1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ atomicType,
+ Array(new UnresolvedFieldReference("name"))
+ )
+
+ fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomicName2(): Unit = {
+ tEnv.getFieldInfo(
+ atomicType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoTupleAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("f0"), "name1"),
+ new Alias(UnresolvedFieldReference("f1"), "name2"),
+ new Alias(UnresolvedFieldReference("f2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoTupleAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("f2"), "name1"),
+ new Alias(UnresolvedFieldReference("f0"), "name2"),
+ new Alias(UnresolvedFieldReference("f1"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoTupleAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoCClassAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("cf1"), "name1"),
+ new Alias(new UnresolvedFieldReference("cf2"), "name2"),
+ new Alias(new UnresolvedFieldReference("cf3"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClassAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("cf3"), "name1"),
+ new Alias(new UnresolvedFieldReference("cf1"), "name2"),
+ new Alias(new UnresolvedFieldReference("cf2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoCClassAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(new UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoPojoAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("pf1"), "name1"),
+ new Alias(new UnresolvedFieldReference("pf2"), "name2"),
+ new Alias(new UnresolvedFieldReference("pf3"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoPojoAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("pf3"), "name1"),
+ new Alias(new UnresolvedFieldReference("pf1"), "name2"),
+ new Alias(new UnresolvedFieldReference("pf2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoPojoAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(new UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomicAlias(): Unit = {
+ tEnv.getFieldInfo(
+ atomicType,
+ Array(
+ new Alias(new UnresolvedFieldReference("name1"), "name2")
+ ))
+ }
+
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+ override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+ override protected def checkValidTableName(name: String): Unit = ???
+
+ override protected def getBuiltInRuleSet: RuleSet = ???
+
+ override def sql(query: String): Table = ???
+}
+
+case class CClass(cf1: Int, cf2: String, cf3: Double)
+
+class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
+ def this() = this(0, "", 0.0)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
new file mode 100644
index 0000000..a323ec9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainTest
+ extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
+
+ val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
+
+ @Test
+ def testFilterWithoutExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+
+ @Test
+ def testFilterWithExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+
+ @Test
+ def testJoinWithoutExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+ val table = table1.join(table2).where("b = d").select("a, c")
+
+ val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(source, result)
+ }
+
+ @Test
+ def testJoinWithExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+ val table = table1.join(table2).where("b = d").select("a, c")
+
+ val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(source, result)
+ }
+
+ @Test
+ def testUnionWithoutExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+
+ @Test
+ def testUnionWithExtended() : Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
new file mode 100644
index 0000000..ec4dc59
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ private val tableName = "MyTable"
+ private var tableEnv: BatchTableEnvironment = null
+
+ @Before
+ def initTableEnv(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ tableEnv = TableEnvironment.getTableEnvironment(env, config)
+ tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+ }
+
+ @Test
+ def testTableAPI(): Unit = {
+ val results = tableEnv
+ .scan(tableName)
+ .where("amount < 4")
+ .select("id, name")
+ .collect()
+
+ val expected = Seq(
+ "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+ "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testSQL(): Unit = {
+ val results = tableEnv
+ .sql(s"select id, name from $tableName where amount < 4 ")
+ .collect()
+
+ val expected = Seq(
+ "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+ "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
+
+class TestProjectableTableSource(
+ fieldTypes: Array[TypeInformation[_]],
+ fieldNames: Array[String])
+ extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+ def this() = this(
+ fieldTypes = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO),
+ fieldNames = Array[String]("name", "id", "amount", "price")
+ )
+
+ /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
+ override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+ execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
+ }
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+ /** Returns the names of the table fields. */
+ override def getFieldsNames: Array[String] = fieldNames
+
+ /** Returns the [[TypeInformation]] for the return type. */
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
+
+ /** Returns the number of fields of the table. */
+ override def getNumberOfFields: Int = fieldNames.length
+
+ override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
+ val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
+ val projectedFieldNames = new Array[String](fields.length)
+
+ fields.zipWithIndex.foreach(f => {
+ projectedFieldTypes(f._2) = fieldTypes(f._1)
+ projectedFieldNames(f._2) = fieldNames(f._1)
+ })
+ new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
+ }
+
+ private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
+ for {cnt <- 0 until num}
+ yield {
+ val row = new Row(fieldNames.length)
+ fieldNames.zipWithIndex.foreach(
+ f =>
+ f._1 match {
+ case "name" =>
+ row.setField(f._2, "Record_" + cnt)
+ case "id" =>
+ row.setField(f._2, cnt.toLong)
+ case "amount" =>
+ row.setField(f._2, cnt.toInt % 16)
+ case "price" =>
+ row.setField(f._2, cnt.toDouble / 3)
+ case _ =>
+ throw new IllegalArgumentException(s"unknown field name $f._1")
+ }
+ )
+ row
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..961e575
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSimpleRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds)
+ val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRegisterWithFields(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+ val t = tEnv.scan(tableName).select('a, 'b)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+ "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterExistingDataSet(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds1)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ // Must fail. Name is already in use.
+ tEnv.registerDataSet("MyTable", ds2)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testScanUnregisteredTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ // Must fail. No table registered under that name.
+ tEnv.scan("someTable")
+ }
+
+ @Test
+ def testTableRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable(tableName, t)
+
+ val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+ val expected = "9,4\n" + "10,4\n" +
+ "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+ "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = regT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterExistingTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", t1)
+ val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+ // Must fail. Name is already in use.
+ tEnv.registerDataSet("MyTable", t2)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegisterTableFromOtherEnv(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+ val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
+ // Must fail. Table is bound to different TableEnvironment.
+ tEnv2.registerTable("MyTable", t1)
+ }
+
+ @Test
+ def testToTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "Peter,28,4000.0,Sales\n" +
+ "Anna,56,10000.0,Engineering\n" +
+ "Lucy,42,6000.0,HR\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromAndToCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+ "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+ "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+ val results = t.toDataSet[SomeCaseClass].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithToFewFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithToManyFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Number of fields does not match.
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithAmbiguousFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ CollectionDataSets.get3TupleDataSet(env)
+ // Must fail. Field names not unique.
+ .toTable(tEnv, 'a, 'b, 'b)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithNonFieldReference1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a + 1, 'b, 'c)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToTableWithNonFieldReference2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // Must fail. as() can only have field references
+ CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a as 'foo, 'b, 'c)
+ }
+}
+
+object TableEnvironmentITCase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
+ }
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+ def this() { this("", 0, 0.0, "") }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
new file mode 100644
index 0000000..8bc7874
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import java.io.File
+
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+
+@RunWith(classOf[Parameterized])
+class TableSinkITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testBatchTableSink(): Unit = {
+
+ val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+ tmpFile.deleteOnExit()
+ val path = tmpFile.toURI.toString
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ env.setParallelism(4)
+
+ val input = CollectionDataSets.get3TupleDataSet(env)
+ .map(x => x).setParallelism(4) // increase DOP to 4
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
+
+ env.execute()
+
+ val expected = Seq(
+ "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
+ "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n")
+
+ TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+ }
+
+}