You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:22 UTC
[13/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA
logical plans consistent test
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
deleted file mode 100644
index 09d3c04..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(mode, configMode) {
-
- private def getExecutionEnvironment = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- // set the parallelism explicitly to make sure the query is executed in
- // a distributed manner
- env.setParallelism(3)
- env
- }
-
- @Test
- def testOrderByDesc(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.desc)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- - x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByAsc(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByMultipleFieldsDifferentDirections(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- def rowOrdering = Ordering.by((r : Row) => {
- // ordering for this tuple will fall into the previous defined tupleOrdering,
- // so we just need to return the field by their defining sequence
- (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
- })
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(rowOrdering)
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByOffset(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByOffsetAndFetch(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- - x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByFetch(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
- // squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index 4d7f6cb..079b10a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -17,97 +17,15 @@
*/
package org.apache.flink.table.api.scala.batch.table
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{Table, TableEnvironment, Types}
+import org.apache.flink.table.utils._
import org.junit.Test
-import org.mockito.Mockito._
class UserDefinedTableFunctionTest extends TableTestBase {
@Test
- def testJavaScalaTableAPIEquality(): Unit = {
- // mock
- val ds = mock(classOf[DataSet[Row]])
- val jDs = mock(classOf[JDataSet[Row]])
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- when(ds.javaSet).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
- javaTableEnv.registerTable("MyTable", in2)
-
- // test cross join
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test left outer join
- scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "as(func1(c), s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'len, 'adult)
- javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.join(pojo('c))
- .select('c, 'name, 'age)
- javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
- .select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = in1.join(func2('c) as ('name, 'len))
- .select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = in1.join(func1('c.substring(2)) as 's)
- .select('a, 'c, 's)
- javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
- .select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
- }
-
- @Test
def testCrossJoin(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
index eda958c..b085160 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
@@ -19,24 +19,19 @@
package org.apache.flink.table.api.scala.batch.table.stringexpr
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
import org.junit._
class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testAggregationTypes(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
@@ -46,13 +41,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testWorkingAggregationDataTypes(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
@@ -62,13 +52,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testProjection(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env.fromElements(
- (1: Byte, 1: Short),
- (2: Byte, 2: Short)).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short)]("Table2")
val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
@@ -78,11 +63,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testAggregationWithArithmetic(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2")
val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
@@ -92,11 +74,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testAggregationWithTwoCount(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2")
val t1 = t.select('_1.count, '_2.count)
val t2 = t.select("_1.count, _2.count")
@@ -106,13 +85,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testAggregationAfterProjection(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
val t1 = t.select('_1, '_2, '_3)
.select('_1.avg, '_2.sum, '_3.count)
@@ -125,10 +99,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testDistinct(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val distinct = ds.select('b).distinct()
val distinct2 = ds.select("b").distinct()
@@ -138,10 +110,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testDistinctAfterAggregate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
val distinct = ds.groupBy('a, 'e).select('e).distinct()
val distinct2 = ds.groupBy("a, e").select("e").distinct()
@@ -154,11 +124,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t.groupBy('b).select('b, 'a.sum)
val t2 = t.groupBy("b").select("b, a.sum")
@@ -168,11 +135,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupingKeyForwardIfNotUsed(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t.groupBy('b).select('a.sum)
val t2 = t.groupBy("b").select("a.sum")
@@ -182,11 +146,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupNoAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t
.groupBy('b)
@@ -205,11 +166,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregateWithConstant1(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t.select('a, 4 as 'four, 'b)
.groupBy('four, 'a)
@@ -224,11 +182,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregateWithConstant2(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t.select('b, 4 as 'four, 'a)
.groupBy('b, 'four)
@@ -242,11 +197,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregateWithExpression(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
val t1 = t.groupBy('e, 'b % 3)
.select('c.min, 'e, 'a.avg, 'd.count)
@@ -258,11 +210,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregateWithFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val t1 = t.groupBy('b)
.select('b, 'a.sum)
@@ -295,16 +244,13 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testAggregateWithUDAGG(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myCnt = new CountAggFunction
- tEnv.registerFunction("myCnt", myCnt)
+ util.tableEnv.registerFunction("myCnt", myCnt)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- tEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
@@ -323,16 +269,14 @@ class AggregationsStringExpressionTest extends TableTestBase {
@Test
def testGroupedAggregateWithUDAGG(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val myCnt = new CountAggFunction
- tEnv.registerFunction("myCnt", myCnt)
+ util.tableEnv.registerFunction("myCnt", myCnt)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- tEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
val t1 = t.groupBy('b)
.select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
index 381b8c8..9736ec1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -21,21 +21,20 @@ package org.apache.flink.table.api.scala.batch.table.stringexpr
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
import org.junit._
-class CalcStringExpressionTest {
+class CalcStringExpressionTest extends TableTestBase {
@Test
def testSimpleSelectAllWithAs(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = t.select('a, 'b, 'c)
val t2 = t.select("a, b, c")
@@ -48,10 +47,8 @@ class CalcStringExpressionTest {
@Test
def testSimpleSelectWithNaming(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t
.select('_1 as 'a, '_2 as 'b, '_1 as 'c)
@@ -69,10 +66,8 @@ class CalcStringExpressionTest {
@Test
def testSimpleSelectRenameAll(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t
.select('_1 as 'a, '_2 as 'b, '_3 as 'c)
@@ -90,10 +85,8 @@ class CalcStringExpressionTest {
@Test
def testSelectStar(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = t.select('*)
val t2 = t.select("*")
@@ -106,10 +99,8 @@ class CalcStringExpressionTest {
@Test
def testAllRejectingFilter(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( Literal(false) )
val t2 = ds.filter("faLsE")
@@ -122,10 +113,8 @@ class CalcStringExpressionTest {
@Test
def testAllPassingFilter(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( Literal(true) )
val t2 = ds.filter("trUe")
@@ -138,10 +127,8 @@ class CalcStringExpressionTest {
@Test
def testFilterOnStringTupleField(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( 'c.like("%world%") )
val t2 = ds.filter("c.like('%world%')")
@@ -154,10 +141,8 @@ class CalcStringExpressionTest {
@Test
def testFilterOnIntegerTupleField(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( 'a % 2 === 0 )
val t2 = ds.filter( "a % 2 = 0 ")
@@ -170,10 +155,8 @@ class CalcStringExpressionTest {
@Test
def testNotEquals(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( 'a % 2 !== 0 )
val t2 = ds.filter("a % 2 <> 0")
@@ -186,10 +169,8 @@ class CalcStringExpressionTest {
@Test
def testDisjunctivePredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter( 'a < 2 || 'a > 20)
val t2 = ds.filter("a < 2 || a > 20")
@@ -202,10 +183,8 @@ class CalcStringExpressionTest {
@Test
def testConsecutiveFilters(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
@@ -218,10 +197,8 @@ class CalcStringExpressionTest {
@Test
def testFilterBasicType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.getStringDataSet(env).toTable(tEnv, 'a)
+ val util = batchTestUtil()
+ val ds = util.addTable[String]("Table3",'a)
val t1 = ds.filter( 'a.like("H%") )
val t2 = ds.filter( "a.like('H%')" )
@@ -234,11 +211,8 @@ class CalcStringExpressionTest {
@Test
def testFilterOnCustomType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val t = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+ val util = batchTestUtil()
+ val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
val t1 = t.filter( 's.like("%a%") )
val t2 = t.filter("s.like('%a%')")
@@ -251,10 +225,8 @@ class CalcStringExpressionTest {
@Test
def testSimpleCalc(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t.select('_1, '_2, '_3)
.where('_1 < 7)
@@ -272,10 +244,8 @@ class CalcStringExpressionTest {
@Test
def testCalcWithTwoFilters(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t.select('_1, '_2, '_3)
.where('_1 < 7 && '_2 === 3)
@@ -297,10 +267,8 @@ class CalcStringExpressionTest {
@Test
def testCalcWithAggregation(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3")
val t1 = t.select('_1, '_2, '_3)
.where('_1 < 15)
@@ -325,11 +293,9 @@ class CalcStringExpressionTest {
@Test
def testCalcJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
.where('b > 1).select('a, 'd).where('d === 2)
@@ -344,17 +310,9 @@ class CalcStringExpressionTest {
@Test
def testAdvancedDataTypes(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = env
- .fromElements((
- BigDecimal("78.454654654654654").bigDecimal,
- BigDecimal("4E+9999").bigDecimal,
- Date.valueOf("1984-07-12"),
- Time.valueOf("14:34:24"),
- Timestamp.valueOf("1984-07-12 14:34:24")))
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ val util = batchTestUtil()
+ val t = util
+ .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
"1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
@@ -371,9 +329,8 @@ class CalcStringExpressionTest {
@Test
def testIntegerBiggerThan128(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val t = env.fromElements((300, 1L, "Hello")).toTable(tableEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val t1 = t.filter('a === 300)
val t2 = t.filter("a = 300")
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
index 8a2db41..8ca5745 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
@@ -27,7 +27,7 @@ import org.junit._
class CastingStringExpressionTest {
@Test
- def testNumericAutocastInArithmetic() {
+ def testNumericAutoCastInArithmetic() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
@@ -47,7 +47,7 @@ class CastingStringExpressionTest {
@Test
@throws[Exception]
- def testNumericAutocastInComparison() {
+ def testNumericAutoCastInComparison() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
index b2f683c..067441d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -19,22 +19,19 @@
package org.apache.flink.table.api.scala.batch.table.stringexpr
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
import org.junit._
-class JoinStringExpressionTest {
+class JoinStringExpressionTest extends TableTestBase {
@Test
def testJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
val t1Java = ds1.join(ds2).where("b === e").select("c, g")
@@ -47,12 +44,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithFilter(): Unit = {
-
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
@@ -65,11 +59,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithJoinFilter(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
@@ -82,11 +74,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
@@ -99,11 +89,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithAggregation(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
val t1Java = ds1.join(ds2).where("a === d").select("g.count")
@@ -118,11 +106,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithGroupedAggregation(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.join(ds2)
.where('a === 'd)
@@ -143,12 +129,10 @@ class JoinStringExpressionTest {
@Test
def testJoinPushThroughJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds3 = util.addTable[(Int, Long, String)]("Table4",'j, 'k, 'l)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.join(ds2)
.where(Literal(true))
@@ -169,11 +153,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithDisjunctivePred(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
@@ -186,11 +168,9 @@ class JoinStringExpressionTest {
@Test
def testJoinWithExpressionPreds(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
@@ -203,12 +183,9 @@ class JoinStringExpressionTest {
@Test
def testLeftJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
@@ -221,12 +198,9 @@ class JoinStringExpressionTest {
@Test
def testRightJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
@@ -239,12 +213,9 @@ class JoinStringExpressionTest {
@Test
def testFullOuterJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val util = batchTestUtil()
+ val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+ val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
new file mode 100644
index 0000000..8cf9979
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.api.{Table, Types}
+import org.junit.Test
+
+class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testJoin(): Unit = {
+ val util = batchTestUtil()
+
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
+ val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
+
+ // test cross join
+ val func1 = new TableFunc1
+ util.javaTableEnv.registerFunction("func1", func1)
+ var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+ var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test left outer join
+ scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test overloading
+ scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test custom result type
+ val func2 = new TableFunc2
+ util.javaTableEnv.registerFunction("func2", func2)
+ scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test hierarchy generic type
+ val hierarchy = new HierarchyTableFunction
+ util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+ .select("c, name, len, adult")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test pojo type
+ val pojo = new PojoTableFunc
+ util.javaTableEnv.registerFunction("pojo", pojo)
+ scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with filter
+ scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+ .select("c, name, len").filter("len > 2")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with scalar function
+ scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+ verifyTableEquals(scalaTable, javaTable)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
index 8e90fa8..25ecd96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
@@ -19,24 +19,24 @@
package org.apache.flink.table.api.scala.batch.table.validation
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMergeAndReset}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
import org.junit._
-class AggregationsValidationTest {
+class AggregationsValidationTest extends TableTestBase {
/**
* OVER clause is necessary for [[OverAgg0]] window function.
*/
@Test(expected = classOf[ValidationException])
def testOverAggregation(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
val overAgg = new OverAgg0
- input.select('c.count, overAgg('b, 'a))
+ t.select('c.count, overAgg('b, 'a))
}
@Test(expected = classOf[ValidationException])
@@ -52,199 +52,176 @@ class AggregationsValidationTest {
@Test(expected = classOf[ValidationException])
def testNoNestedAggregations(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(String, Int)]("Table2")
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- env.fromElements(("Hello", 1)).toTable(tEnv)
- // Must fail. Sum aggregation can not be chained.
- .select('_2.sum.sum)
+ // Must fail. Sum aggregation can not be chained.
+ t.select('_2.sum.sum)
}
@Test(expected = classOf[ValidationException])
def testGroupingOnNonExistentField(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- // must fail. '_foo not a valid field
- .groupBy('_foo)
- .select('a.avg)
+ // must fail. '_foo not a valid field
+ t.groupBy('_foo)
+ .select('a.avg)
}
@Test(expected = classOf[ValidationException])
def testGroupingInvalidSelection(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- .groupBy('a, 'b)
- // must fail. 'c is not a grouping key or aggregation
- .select('c)
+ t.groupBy('a, 'b)
+ // must fail. 'c is not a grouping key or aggregation
+ .select('c)
}
@Test(expected = classOf[ValidationException])
def testAggregationOnNonExistingField(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- // Must fail. Field 'foo does not exist.
- .select('foo.avg)
+ // Must fail. Field 'foo does not exist.
+ t.select('foo.avg)
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testInvalidUdAggArgs() {
- val env= ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- // must fail. UDAGG does not accept String type
- .select(myWeightedAvg('c, 'a))
+ // must fail. UDAGG does not accept String type
+ t.select(myWeightedAvg('c, 'a))
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingInvalidUdAggArgs() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- .groupBy('b)
- // must fail. UDAGG does not accept String type
- .select(myWeightedAvg('c, 'a))
+ t.groupBy('b)
+ // must fail. UDAGG does not accept String type
+ .select(myWeightedAvg('c, 'a))
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingNestedUdAgg() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- .groupBy('c)
- // must fail. UDAGG does not accept String type
- .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
+ t.groupBy('c)
+ // must fail. UDAGG does not accept String type
+ .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testAggregationOnNonExistingFieldJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv)
- table.select("foo.avg")
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ t.select("foo.avg")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testNonWorkingAggregationDataTypesJava() {
- val env= ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2",'b, 'c)
// Must fail. Cannot compute SUM aggregate on String field.
- table.select("f1.sum")
+ t.select("c.sum")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testNoNestedAggregationsJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, String)]("Table2",'b, 'c)
// Must fail. Aggregation on aggregation not allowed.
- table.select("f0.sum.sum")
+ t.select("b.sum.sum")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingOnNonExistentFieldJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- // must fail. Field foo is not in input
- .groupBy("foo")
- .select("a.avg")
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ // must fail. Field foo is not in input
+ t.groupBy("foo")
+ .select("a.avg")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingInvalidSelectionJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- .groupBy("a, b")
- // must fail. Field c is not a grouping key or aggregation
- .select("c")
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ t.groupBy("a, b")
+ // must fail. Field c is not a grouping key or aggregation
+ .select("c")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testUnknownUdAggJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- // must fail. unknown is not known
- .select("unknown(c)")
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ // must fail. unknown is not known
+ t.select("unknown(c)")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingUnknownUdAggJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- .groupBy("a, b")
- // must fail. unknown is not known
- .select("unknown(c)")
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ t.groupBy("a, b")
+ // must fail. unknown is not known
+ .select("unknown(c)")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testInvalidUdAggArgsJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- // must fail. UDAGG does not accept String type
- .select("myWeightedAvg(c, a)")
+ // must fail. UDAGG does not accept String type
+ t.select("myWeightedAvg(c, a)")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testGroupingInvalidUdAggArgsJava() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
val myWeightedAvg = new WeightedAvgWithMergeAndReset
- tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+ util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
- val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
- input
- .groupBy("b")
- // must fail. UDAGG does not accept String type
- .select("myWeightedAvg(c, a)")
+ t.groupBy("b")
+ // must fail. UDAGG does not accept String type
+ .select("myWeightedAvg(c, a)")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
index 846585b..e2a5dac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
@@ -19,117 +19,105 @@
package org.apache.flink.table.api.scala.batch.table.validation
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
-class CalcValidationTest {
+class CalcValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testSelectInvalidFieldFields(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// must fail. Field 'foo does not exist
.select('a, 'foo)
}
@Test(expected = classOf[ValidationException])
def testSelectAmbiguousRenaming(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// must fail. 'a and 'b are both renamed to 'foo
.select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
}
@Test(expected = classOf[ValidationException])
def testSelectAmbiguousRenaming2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// must fail. 'a and 'b are both renamed to 'a
.select('a, 'b as 'a).toDataSet[Row].print()
}
@Test(expected = classOf[ValidationException])
def testFilterInvalidFieldName(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// must fail. Field 'foo does not exist
- ds.filter( 'foo === 2 )
+ t.filter( 'foo === 2 )
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testSelectInvalidField() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// Must fail. Field foo does not exist
- ds.select("a + 1, foo + 2")
+ t.select("a + 1, foo + 2")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testSelectAmbiguousFieldNames() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// Must fail. Field foo does not exist
- ds.select("a + 1 as foo, b + 2 as foo")
+ t.select("a + 1 as foo, b + 2 as foo")
}
@Test(expected = classOf[ValidationException])
@throws[Exception]
def testFilterInvalidField() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
// Must fail. Field foo does not exist.
- table.filter("foo = 17")
+ t.filter("foo = 17")
}
@Test
def testAliasStarException(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
+ val util = batchTestUtil()
try {
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+ util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
fail("TableException expected")
} catch {
case _: TableException => //ignore
}
try {
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+ util.addTable[(Int, Long, String)]("Table2")
+ .select('_1 as '*, '_2 as 'b, '_1 as 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}
try {
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+ util.addTable[(Int, Long, String)]("Table3").as('*, 'b, 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}
-try {
- CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+ try {
+ util.addTable[(Int, Long, String)]("Table4", 'a, 'b, 'c).select('*, 'b)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
new file mode 100644
index 0000000..0ac205a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.junit.Test
+
+class CompositeFlatteningValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testDuplicateFlattening(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ table.select('a.flatten(), 'a.flatten())
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
new file mode 100644
index 0000000..b8e2b97
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowValidationTest extends TableTestBase {
+
+ //===============================================================================================
+ // Common test
+ //===============================================================================================
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testOverAggregation(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+ val overAgg = new OverAgg0
+ table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('string,'w)
+ .select(overAgg('long, 'int))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupByWithoutWindowAlias(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowTimeRef(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int does not exist in input.
+ .groupBy('w2)
+ .select('string)
+ }
+
+ //===============================================================================================
+ // Tumbling Windows
+ //===============================================================================================
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidProcessingTimeDefinition(): Unit = {
+ val util = batchTestUtil()
+ // proctime is not allowed
+ util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidProcessingTimeDefinition2(): Unit = {
+ val util = batchTestUtil()
+ // proctime is not allowed
+ util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidEventTimeDefinition(): Unit = {
+ val util = batchTestUtil()
+ // definition must not extend schema
+ util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Tumble over 2.minutes on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Tumble over 2.minutes on 'rowtime as 'w)
+ .groupBy('w)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+ //===============================================================================================
+ // Sliding Windows
+ //===============================================================================================
+
+ @Test(expected = classOf[ValidationException])
+ def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Slide over 2.minutes every 1.minute on 'long as 'w)
+ .groupBy('w)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Session withGap 2.minutes on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val myWeightedAvg = new WeightedAvgWithMerge
+
+ table
+ .window(Session withGap 2.minutes on 'rowtime as 'w)
+ .groupBy('w)
+ // invalid function arguments
+ .select(myWeightedAvg('int, 'string))
+ }
+
+}