You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:37 UTC
[08/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
deleted file mode 100644
index 56f40ea..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.junit.{Ignore, Test}
-
-/**
- * Tests all SQL expressions that are currently supported according to the documentation.
- * This tests should be kept in sync with the documentation to reduce confusion due to the
- * large amount of SQL functions.
- *
- * The tests do not test every parameter combination of a function.
- * They are rather a function existence test and simple functional test.
- *
- * The tests are split up and ordered like the sections in the documentation.
- */
-class SqlExpressionTest extends ExpressionTestBase {
-
- @Test
- def testComparisonFunctions(): Unit = {
- testSqlApi("1 = 1", "true")
- testSqlApi("1 <> 1", "false")
- testSqlApi("5 > 2", "true")
- testSqlApi("2 >= 2", "true")
- testSqlApi("5 < 2", "false")
- testSqlApi("2 <= 2", "true")
- testSqlApi("1 IS NULL", "false")
- testSqlApi("1 IS NOT NULL", "true")
- testSqlApi("NULLIF(1,1) IS DISTINCT FROM NULLIF(1,1)", "false")
- testSqlApi("NULLIF(1,1) IS NOT DISTINCT FROM NULLIF(1,1)", "true")
- testSqlApi("NULLIF(1,1) IS NOT DISTINCT FROM NULLIF(1,1)", "true")
- testSqlApi("12 BETWEEN 11 AND 13", "true")
- testSqlApi("12 BETWEEN ASYMMETRIC 13 AND 11", "false")
- testSqlApi("12 BETWEEN SYMMETRIC 13 AND 11", "true")
- testSqlApi("12 NOT BETWEEN 11 AND 13", "false")
- testSqlApi("12 NOT BETWEEN ASYMMETRIC 13 AND 11", "true")
- testSqlApi("12 NOT BETWEEN SYMMETRIC 13 AND 11", "false")
- testSqlApi("'TEST' LIKE '%EST'", "true")
- //testSqlApi("'%EST' LIKE '.%EST' ESCAPE '.'", "true") // TODO
- testSqlApi("'TEST' NOT LIKE '%EST'", "false")
- //testSqlApi("'%EST' NOT LIKE '.%EST' ESCAPE '.'", "false") // TODO
- testSqlApi("'TEST' SIMILAR TO '.EST'", "true")
- //testSqlApi("'TEST' SIMILAR TO ':.EST' ESCAPE ':'", "true") // TODO
- testSqlApi("'TEST' NOT SIMILAR TO '.EST'", "false")
- //testSqlApi("'TEST' NOT SIMILAR TO ':.EST' ESCAPE ':'", "false") // TODO
- testSqlApi("'TEST' IN ('west', 'TEST', 'rest')", "true")
- testSqlApi("'TEST' IN ('west', 'rest')", "false")
- testSqlApi("'TEST' NOT IN ('west', 'TEST', 'rest')", "false")
- testSqlApi("'TEST' NOT IN ('west', 'rest')", "true")
-
- // sub-query functions are not listed here
- }
-
- @Test
- def testLogicalFunctions(): Unit = {
- testSqlApi("TRUE OR FALSE", "true")
- testSqlApi("TRUE AND FALSE", "false")
- testSqlApi("NOT TRUE", "false")
- testSqlApi("TRUE IS FALSE", "false")
- testSqlApi("TRUE IS NOT FALSE", "true")
- testSqlApi("TRUE IS TRUE", "true")
- testSqlApi("TRUE IS NOT TRUE", "false")
- testSqlApi("NULLIF(TRUE,TRUE) IS UNKNOWN", "true")
- testSqlApi("NULLIF(TRUE,TRUE) IS NOT UNKNOWN", "false")
- }
-
- @Test
- def testArithmeticFunctions(): Unit = {
- testSqlApi("+5", "5")
- testSqlApi("-5", "-5")
- testSqlApi("5+5", "10")
- testSqlApi("5-5", "0")
- testSqlApi("5*5", "25")
- testSqlApi("5/5", "1")
- testSqlApi("POWER(5, 5)", "3125.0")
- testSqlApi("ABS(-5)", "5")
- testSqlApi("MOD(-26, 5)", "-1")
- testSqlApi("SQRT(4)", "2.0")
- testSqlApi("LN(1)", "0.0")
- testSqlApi("LOG10(1)", "0.0")
- testSqlApi("EXP(0)", "1.0")
- testSqlApi("CEIL(2.5)", "3")
- testSqlApi("FLOOR(2.5)", "2")
- }
-
- @Test
- def testStringFunctions(): Unit = {
- testSqlApi("'test' || 'string'", "teststring")
- testSqlApi("CHAR_LENGTH('string')", "6")
- testSqlApi("CHARACTER_LENGTH('string')", "6")
- testSqlApi("UPPER('string')", "STRING")
- testSqlApi("LOWER('STRING')", "string")
- testSqlApi("POSITION('STR' IN 'STRING')", "1")
- testSqlApi("TRIM(BOTH ' STRING ')", "STRING")
- testSqlApi("TRIM(LEADING 'x' FROM 'xxxxSTRINGxxxx')", "STRINGxxxx")
- testSqlApi("TRIM(TRAILING 'x' FROM 'xxxxSTRINGxxxx')", "xxxxSTRING")
- testSqlApi(
- "OVERLAY('This is a old string' PLACING 'new' FROM 11 FOR 3)",
- "This is a new string")
- testSqlApi("SUBSTRING('hello world', 2)", "ello world")
- testSqlApi("SUBSTRING('hello world', 2, 3)", "ell")
- testSqlApi("INITCAP('hello world')", "Hello World")
- }
-
- @Test
- def testConditionalFunctions(): Unit = {
- testSqlApi("CASE 2 WHEN 1, 2 THEN 2 ELSE 3 END", "2")
- testSqlApi("CASE WHEN 1 = 2 THEN 2 WHEN 1 = 1 THEN 3 ELSE 3 END", "3")
- testSqlApi("NULLIF(1, 1)", "null")
- testSqlApi("COALESCE(NULL, 5)", "5")
- }
-
- @Test
- def testTypeConversionFunctions(): Unit = {
- testSqlApi("CAST(2 AS DOUBLE)", "2.0")
- }
-
- @Test
- def testValueConstructorFunctions(): Unit = {
- // TODO we need a special code path that flattens ROW types
- // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0
- // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0
- testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
- testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
- }
-
- @Test
- def testDateTimeFunctions(): Unit = {
- testSqlApi("DATE '1990-10-14'", "1990-10-14")
- testSqlApi("TIME '12:12:12'", "12:12:12")
- testSqlApi("TIMESTAMP '1990-10-14 12:12:12.123'", "1990-10-14 12:12:12.123")
- testSqlApi("INTERVAL '10 00:00:00.004' DAY TO SECOND", "+10 00:00:00.004")
- testSqlApi("INTERVAL '10 00:12' DAY TO MINUTE", "+10 00:12:00.000")
- testSqlApi("INTERVAL '2-10' YEAR TO MONTH", "+2-10")
- testSqlApi("EXTRACT(DAY FROM DATE '1990-12-01')", "1")
- testSqlApi("EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3))", "19")
- testSqlApi("QUARTER(DATE '2016-04-12')", "2")
- }
-
- @Test
- def testArrayFunctions(): Unit = {
- testSqlApi("CARDINALITY(ARRAY[TRUE, TRUE, FALSE])", "3")
- testSqlApi("ELEMENT(ARRAY['HELLO WORLD'])", "HELLO WORLD")
- }
-
- override def testData: Any = new Row(0)
-
- override def typeInfo: TypeInformation[Any] =
- new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
deleted file mode 100644
index bd771ba..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
+++ /dev/null
@@ -1,573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.Types
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class TemporalTypesTest extends ExpressionTestBase {
-
- @Test
- def testTimePointLiterals(): Unit = {
- testAllApis(
- "1990-10-14".toDate,
- "'1990-10-14'.toDate",
- "DATE '1990-10-14'",
- "1990-10-14")
-
- testTableApi(
- Date.valueOf("2040-09-11"),
- "'2040-09-11'.toDate",
- "2040-09-11")
-
- testAllApis(
- "1500-04-30".cast(Types.DATE),
- "'1500-04-30'.cast(DATE)",
- "CAST('1500-04-30' AS DATE)",
- "1500-04-30")
-
- testAllApis(
- "15:45:59".toTime,
- "'15:45:59'.toTime",
- "TIME '15:45:59'",
- "15:45:59")
-
- testTableApi(
- Time.valueOf("00:00:00"),
- "'00:00:00'.toTime",
- "00:00:00")
-
- testAllApis(
- "1:30:00".cast(Types.TIME),
- "'1:30:00'.cast(TIME)",
- "CAST('1:30:00' AS TIME)",
- "01:30:00")
-
- testAllApis(
- "1990-10-14 23:00:00.123".toTimestamp,
- "'1990-10-14 23:00:00.123'.toTimestamp",
- "TIMESTAMP '1990-10-14 23:00:00.123'",
- "1990-10-14 23:00:00.123")
-
- testTableApi(
- Timestamp.valueOf("2040-09-11 00:00:00.000"),
- "'2040-09-11 00:00:00.000'.toTimestamp",
- "2040-09-11 00:00:00.0")
-
- testAllApis(
- "1500-04-30 12:00:00".cast(Types.TIMESTAMP),
- "'1500-04-30 12:00:00'.cast(TIMESTAMP)",
- "CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
- "1500-04-30 12:00:00.0")
- }
-
- @Test
- def testTimeIntervalLiterals(): Unit = {
- testAllApis(
- 1.year,
- "1.year",
- "INTERVAL '1' YEAR",
- "+1-00")
-
- testAllApis(
- 1.month,
- "1.month",
- "INTERVAL '1' MONTH",
- "+0-01")
-
- testAllApis(
- 12.days,
- "12.days",
- "INTERVAL '12' DAY",
- "+12 00:00:00.000")
-
- testAllApis(
- 1.hour,
- "1.hour",
- "INTERVAL '1' HOUR",
- "+0 01:00:00.000")
-
- testAllApis(
- 3.minutes,
- "3.minutes",
- "INTERVAL '3' MINUTE",
- "+0 00:03:00.000")
-
- testAllApis(
- 3.seconds,
- "3.seconds",
- "INTERVAL '3' SECOND",
- "+0 00:00:03.000")
-
- testAllApis(
- 3.millis,
- "3.millis",
- "INTERVAL '0.003' SECOND",
- "+0 00:00:00.003")
- }
-
- @Test
- def testTimePointInput(): Unit = {
- testAllApis(
- 'f0,
- "f0",
- "f0",
- "1990-10-14")
-
- testAllApis(
- 'f1,
- "f1",
- "f1",
- "10:20:45")
-
- testAllApis(
- 'f2,
- "f2",
- "f2",
- "1990-10-14 10:20:45.123")
- }
-
- @Test
- def testTimeIntervalInput(): Unit = {
- testAllApis(
- 'f9,
- "f9",
- "f9",
- "+2-00")
-
- testAllApis(
- 'f10,
- "f10",
- "f10",
- "+0 00:00:12.000")
- }
-
- @Test
- def testTimePointCasting(): Unit = {
- testAllApis(
- 'f0.cast(Types.TIMESTAMP),
- "f0.cast(TIMESTAMP)",
- "CAST(f0 AS TIMESTAMP)",
- "1990-10-14 00:00:00.0")
-
- testAllApis(
- 'f1.cast(Types.TIMESTAMP),
- "f1.cast(TIMESTAMP)",
- "CAST(f1 AS TIMESTAMP)",
- "1970-01-01 10:20:45.0")
-
- testAllApis(
- 'f2.cast(Types.DATE),
- "f2.cast(DATE)",
- "CAST(f2 AS DATE)",
- "1990-10-14")
-
- testAllApis(
- 'f2.cast(Types.TIME),
- "f2.cast(TIME)",
- "CAST(f2 AS TIME)",
- "10:20:45")
-
- testAllApis(
- 'f2.cast(Types.TIME),
- "f2.cast(TIME)",
- "CAST(f2 AS TIME)",
- "10:20:45")
-
- testTableApi(
- 'f7.cast(Types.DATE),
- "f7.cast(DATE)",
- "2002-11-09")
-
- testTableApi(
- 'f7.cast(Types.DATE).cast(Types.INT),
- "f7.cast(DATE).cast(INT)",
- "12000")
-
- testTableApi(
- 'f7.cast(Types.TIME),
- "f7.cast(TIME)",
- "00:00:12")
-
- testTableApi(
- 'f7.cast(Types.TIME).cast(Types.INT),
- "f7.cast(TIME).cast(INT)",
- "12000")
-
- testTableApi(
- 'f8.cast(Types.TIMESTAMP),
- "f8.cast(TIMESTAMP)",
- "2016-06-27 07:23:33.0")
-
- testTableApi(
- 'f8.cast(Types.TIMESTAMP).cast(Types.LONG),
- "f8.cast(TIMESTAMP).cast(LONG)",
- "1467012213000")
- }
-
- @Test
- def testTimeIntervalCasting(): Unit = {
- testTableApi(
- 'f7.cast(Types.INTERVAL_MONTHS),
- "f7.cast(INTERVAL_MONTHS)",
- "+1000-00")
-
- testTableApi(
- 'f8.cast(Types.INTERVAL_MILLIS),
- "f8.cast(INTERVAL_MILLIS)",
- "+16979 07:23:33.000")
- }
-
- @Test
- def testTimePointComparison(): Unit = {
- testAllApis(
- 'f0 < 'f3,
- "f0 < f3",
- "f0 < f3",
- "false")
-
- testAllApis(
- 'f0 < 'f4,
- "f0 < f4",
- "f0 < f4",
- "true")
-
- testAllApis(
- 'f1 < 'f5,
- "f1 < f5",
- "f1 < f5",
- "false")
-
- testAllApis(
- 'f0.cast(Types.TIMESTAMP) !== 'f2,
- "f0.cast(TIMESTAMP) !== f2",
- "CAST(f0 AS TIMESTAMP) <> f2",
- "true")
-
- testAllApis(
- 'f0.cast(Types.TIMESTAMP) === 'f6,
- "f0.cast(TIMESTAMP) === f6",
- "CAST(f0 AS TIMESTAMP) = f6",
- "true")
- }
-
- @Test
- def testTimeIntervalArithmetic(): Unit = {
-
- // interval months comparison
-
- testAllApis(
- 12.months < 24.months,
- "12.months < 24.months",
- "INTERVAL '12' MONTH < INTERVAL '24' MONTH",
- "true")
-
- testAllApis(
- 8.years === 8.years,
- "8.years === 8.years",
- "INTERVAL '8' YEAR = INTERVAL '8' YEAR",
- "true")
-
- // interval millis comparison
-
- testAllApis(
- 8.millis > 10.millis,
- "8.millis > 10.millis",
- "INTERVAL '0.008' SECOND > INTERVAL '0.010' SECOND",
- "false")
-
- testAllApis(
- 8.millis === 8.millis,
- "8.millis === 8.millis",
- "INTERVAL '0.008' SECOND = INTERVAL '0.008' SECOND",
- "true")
-
- // interval months addition/subtraction
-
- testAllApis(
- 8.years + 10.months,
- "8.years + 10.months",
- "INTERVAL '8' YEAR + INTERVAL '10' MONTH",
- "+8-10")
-
- testAllApis(
- 2.years - 12.months,
- "2.years - 12.months",
- "INTERVAL '2' YEAR - INTERVAL '12' MONTH",
- "+1-00")
-
- testAllApis(
- -2.years,
- "-2.years",
- "-INTERVAL '2' YEAR",
- "-2-00")
-
- // interval millis addition/subtraction
-
- testAllApis(
- 8.hours + 10.minutes + 12.seconds + 5.millis,
- "8.hours + 10.minutes + 12.seconds + 5.millis",
- "INTERVAL '8' HOUR + INTERVAL '10' MINUTE + INTERVAL '12.005' SECOND",
- "+0 08:10:12.005")
-
- testAllApis(
- 1.minute - 10.seconds,
- "1.minute - 10.seconds",
- "INTERVAL '1' MINUTE - INTERVAL '10' SECOND",
- "+0 00:00:50.000")
-
- testAllApis(
- -10.seconds,
- "-10.seconds",
- "-INTERVAL '10' SECOND",
- "-0 00:00:10.000")
-
- // addition to date
-
- // interval millis
- testAllApis(
- 'f0 + 2.days,
- "f0 + 2.days",
- "f0 + INTERVAL '2' DAY",
- "1990-10-16")
-
- // interval millis
- testAllApis(
- 30.days + 'f0,
- "30.days + f0",
- "INTERVAL '30' DAY + f0",
- "1990-11-13")
-
- // interval months
- testAllApis(
- 'f0 + 2.months,
- "f0 + 2.months",
- "f0 + INTERVAL '2' MONTH",
- "1990-12-14")
-
- // interval months
- testAllApis(
- 2.months + 'f0,
- "2.months + f0",
- "INTERVAL '2' MONTH + f0",
- "1990-12-14")
-
- // addition to time
-
- // interval millis
- testAllApis(
- 'f1 + 12.hours,
- "f1 + 12.hours",
- "f1 + INTERVAL '12' HOUR",
- "22:20:45")
-
- // interval millis
- testAllApis(
- 12.hours + 'f1,
- "12.hours + f1",
- "INTERVAL '12' HOUR + f1",
- "22:20:45")
-
- // addition to timestamp
-
- // interval millis
- testAllApis(
- 'f2 + 10.days + 4.millis,
- "f2 + 10.days + 4.millis",
- "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND",
- "1990-10-24 10:20:45.127")
-
- // interval millis
- testAllApis(
- 10.days + 'f2 + 4.millis,
- "10.days + f2 + 4.millis",
- "INTERVAL '10 00:00:00.004' DAY TO SECOND + f2",
- "1990-10-24 10:20:45.127")
-
- // interval months
- testAllApis(
- 'f2 + 10.years,
- "f2 + 10.years",
- "f2 + INTERVAL '10' YEAR",
- "2000-10-14 10:20:45.123")
-
- // interval months
- testAllApis(
- 10.years + 'f2,
- "10.years + f2",
- "INTERVAL '10' YEAR + f2",
- "2000-10-14 10:20:45.123")
-
- // subtraction from date
-
- // interval millis
- testAllApis(
- 'f0 - 2.days,
- "f0 - 2.days",
- "f0 - INTERVAL '2' DAY",
- "1990-10-12")
-
- // interval millis
- testAllApis(
- -30.days + 'f0,
- "-30.days + f0",
- "INTERVAL '-30' DAY + f0",
- "1990-09-14")
-
- // interval months
- testAllApis(
- 'f0 - 2.months,
- "f0 - 2.months",
- "f0 - INTERVAL '2' MONTH",
- "1990-08-14")
-
- // interval months
- testAllApis(
- -2.months + 'f0,
- "-2.months + f0",
- "-INTERVAL '2' MONTH + f0",
- "1990-08-14")
-
- // subtraction from time
-
- // interval millis
- testAllApis(
- 'f1 - 12.hours,
- "f1 - 12.hours",
- "f1 - INTERVAL '12' HOUR",
- "22:20:45")
-
- // interval millis
- testAllApis(
- -12.hours + 'f1,
- "-12.hours + f1",
- "INTERVAL '-12' HOUR + f1",
- "22:20:45")
-
- // subtraction from timestamp
-
- // interval millis
- testAllApis(
- 'f2 - 10.days - 4.millis,
- "f2 - 10.days - 4.millis",
- "f2 - INTERVAL '10 00:00:00.004' DAY TO SECOND",
- "1990-10-04 10:20:45.119")
-
- // interval millis
- testAllApis(
- -10.days + 'f2 - 4.millis,
- "-10.days + f2 - 4.millis",
- "INTERVAL '-10 00:00:00.004' DAY TO SECOND + f2",
- "1990-10-04 10:20:45.119")
-
- // interval months
- testAllApis(
- 'f2 - 10.years,
- "f2 - 10.years",
- "f2 - INTERVAL '10' YEAR",
- "1980-10-14 10:20:45.123")
-
- // interval months
- testAllApis(
- -10.years + 'f2,
- "-10.years + f2",
- "INTERVAL '-10' YEAR + f2",
- "1980-10-14 10:20:45.123")
-
- // casting
-
- testAllApis(
- -'f9.cast(Types.INTERVAL_MONTHS),
- "-f9.cast(INTERVAL_MONTHS)",
- "-CAST(f9 AS INTERVAL YEAR)",
- "-2-00")
-
- testAllApis(
- -'f10.cast(Types.INTERVAL_MILLIS),
- "-f10.cast(INTERVAL_MILLIS)",
- "-CAST(f10 AS INTERVAL SECOND)",
- "-0 00:00:12.000")
-
- // addition/subtraction of interval millis and interval months
-
- testAllApis(
- 'f0 + 2.days + 1.month,
- "f0 + 2.days + 1.month",
- "f0 + INTERVAL '2' DAY + INTERVAL '1' MONTH",
- "1990-11-16")
-
- testAllApis(
- 'f0 - 2.days - 1.month,
- "f0 - 2.days - 1.month",
- "f0 - INTERVAL '2' DAY - INTERVAL '1' MONTH",
- "1990-09-12")
-
- testAllApis(
- 'f2 + 2.days + 1.month,
- "f2 + 2.days + 1.month",
- "f2 + INTERVAL '2' DAY + INTERVAL '1' MONTH",
- "1990-11-16 10:20:45.123")
-
- testAllApis(
- 'f2 - 2.days - 1.month,
- "f2 - 2.days - 1.month",
- "f2 - INTERVAL '2' DAY - INTERVAL '1' MONTH",
- "1990-09-12 10:20:45.123")
- }
-
- // ----------------------------------------------------------------------------------------------
-
- def testData = {
- val testData = new Row(11)
- testData.setField(0, Date.valueOf("1990-10-14"))
- testData.setField(1, Time.valueOf("10:20:45"))
- testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123"))
- testData.setField(3, Date.valueOf("1990-10-13"))
- testData.setField(4, Date.valueOf("1990-10-15"))
- testData.setField(5, Time.valueOf("00:00:00"))
- testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0"))
- testData.setField(7, 12000)
- testData.setField(8, 1467012213000L)
- testData.setField(9, 24)
- testData.setField(10, 12000L)
- testData
- }
-
- def typeInfo = {
- new RowTypeInfo(
- Types.DATE,
- Types.TIME,
- Types.TIMESTAMP,
- Types.DATE,
- Types.DATE,
- Types.TIME,
- Types.TIMESTAMP,
- Types.INT,
- Types.LONG,
- Types.INTERVAL_MONTHS,
- Types.INTERVAL_MILLIS).asInstanceOf[TypeInformation[Any]]
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
deleted file mode 100644
index 567cca1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.Types
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class UserDefinedScalarFunctionTest extends ExpressionTestBase {
-
- @Test
- def testParameters(): Unit = {
- testAllApis(
- Func0('f0),
- "Func0(f0)",
- "Func0(f0)",
- "42")
-
- testAllApis(
- Func1('f0),
- "Func1(f0)",
- "Func1(f0)",
- "43")
-
- testAllApis(
- Func2('f0, 'f1, 'f3),
- "Func2(f0, f1, f3)",
- "Func2(f0, f1, f3)",
- "42 and Test and SimplePojo(Bob,36)")
-
- testAllApis(
- Func0(123),
- "Func0(123)",
- "Func0(123)",
- "123")
-
- testAllApis(
- Func6('f4, 'f5, 'f6),
- "Func6(f4, f5, f6)",
- "Func6(f4, f5, f6)",
- "(1990-10-14,12:10:10,1990-10-14 12:10:10.0)")
- }
-
- @Test
- def testNullableParameters(): Unit = {
- testAllApis(
- Func3(Null(INT_TYPE_INFO), Null(STRING_TYPE_INFO)),
- "Func3(Null(INT), Null(STRING))",
- "Func3(NULL, NULL)",
- "null and null")
-
- testAllApis(
- Func3(Null(INT_TYPE_INFO), "Test"),
- "Func3(Null(INT), 'Test')",
- "Func3(NULL, 'Test')",
- "null and Test")
-
- testAllApis(
- Func3(42, Null(STRING_TYPE_INFO)),
- "Func3(42, Null(STRING))",
- "Func3(42, NULL)",
- "42 and null")
-
- testAllApis(
- Func0(Null(INT_TYPE_INFO)),
- "Func0(Null(INT))",
- "Func0(NULL)",
- "-1")
- }
-
- @Test
- def testResults(): Unit = {
- testAllApis(
- Func4(),
- "Func4()",
- "Func4()",
- "null")
-
- testAllApis(
- Func5(),
- "Func5()",
- "Func5()",
- "-1")
- }
-
- @Test
- def testNesting(): Unit = {
- testAllApis(
- Func0(Func0('f0)),
- "Func0(Func0(f0))",
- "Func0(Func0(f0))",
- "42")
-
- testAllApis(
- Func0(Func0('f0)),
- "Func0(Func0(f0))",
- "Func0(Func0(f0))",
- "42")
-
- testAllApis(
- Func7(Func7(Func7(1, 1), Func7(1, 1)), Func7(Func7(1, 1), Func7(1, 1))),
- "Func7(Func7(Func7(1, 1), Func7(1, 1)), Func7(Func7(1, 1), Func7(1, 1)))",
- "Func7(Func7(Func7(1, 1), Func7(1, 1)), Func7(Func7(1, 1), Func7(1, 1)))",
- "8")
- }
-
- @Test
- def testOverloadedParameters(): Unit = {
- testAllApis(
- Func8(1),
- "Func8(1)",
- "Func8(1)",
- "a")
-
- testAllApis(
- Func8(1, 1),
- "Func8(1, 1)",
- "Func8(1, 1)",
- "b")
-
- testAllApis(
- Func8("a", "a"),
- "Func8('a', 'a')",
- "Func8('a', 'a')",
- "c")
- }
-
- @Test
- def testTimePointsOnPrimitives(): Unit = {
- testAllApis(
- Func9('f4, 'f5, 'f6),
- "Func9(f4, f5, f6)",
- "Func9(f4, f5, f6)",
- "7591 and 43810000 and 655906210000")
-
- testAllApis(
- Func10('f6),
- "Func10(f6)",
- "Func10(f6)",
- "1990-10-14 12:10:10.0")
- }
-
- @Test
- def testTimeIntervalsOnPrimitives(): Unit = {
- testAllApis(
- Func11('f7, 'f8),
- "Func11(f7, f8)",
- "Func11(f7, f8)",
- "12 and 1000")
-
- testAllApis(
- Func12('f8),
- "Func12(f8)",
- "Func12(f8)",
- "+0 00:00:01.000")
- }
-
- // ----------------------------------------------------------------------------------------------
-
- override def testData: Any = {
- val testData = new Row(9)
- testData.setField(0, 42)
- testData.setField(1, "Test")
- testData.setField(2, null)
- testData.setField(3, SimplePojo("Bob", 36))
- testData.setField(4, Date.valueOf("1990-10-14"))
- testData.setField(5, Time.valueOf("12:10:10"))
- testData.setField(6, Timestamp.valueOf("1990-10-14 12:10:10"))
- testData.setField(7, 12)
- testData.setField(8, 1000L)
- testData
- }
-
- override def typeInfo: TypeInformation[Any] = {
- new RowTypeInfo(
- Types.INT,
- Types.STRING,
- Types.BOOLEAN,
- TypeInformation.of(classOf[SimplePojo]),
- Types.DATE,
- Types.TIME,
- Types.TIMESTAMP,
- Types.INTERVAL_MONTHS,
- Types.INTERVAL_MILLIS
- ).asInstanceOf[TypeInformation[Any]]
- }
-
- override def functions: Map[String, ScalarFunction] = Map(
- "Func0" -> Func0,
- "Func1" -> Func1,
- "Func2" -> Func2,
- "Func3" -> Func3,
- "Func4" -> Func4,
- "Func5" -> Func5,
- "Func6" -> Func6,
- "Func7" -> Func7,
- "Func8" -> Func8,
- "Func9" -> Func9,
- "Func10" -> Func10,
- "Func11" -> Func11,
- "Func12" -> Func12
- )
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
deleted file mode 100644
index 3156ba8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.utils
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.{Programs, RelBuilder}
-import org.apache.flink.api.common.functions.{Function, MapFunction}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
-import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.{After, Before}
-import org.mockito.Mockito._
-
-import scala.collection.mutable
-
-/**
- * Base test class for expression tests.
- */
-abstract class ExpressionTestBase {
-
- private val testExprs = mutable.ArrayBuffer[(RexNode, String)]()
-
- // setup test utils
- private val tableName = "testTable"
- private val context = prepareContext(typeInfo)
- private val planner = new FlinkPlannerImpl(
- context._2.getFrameworkConfig,
- context._2.getPlanner,
- context._2.getTypeFactory)
- private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-
- private def prepareContext(typeInfo: TypeInformation[Any]): (RelBuilder, TableEnvironment) = {
- // create DataSetTable
- val dataSetMock = mock(classOf[DataSet[Any]])
- val jDataSetMock = mock(classOf[JDataSet[Any]])
- when(dataSetMock.javaSet).thenReturn(jDataSetMock)
- when(jDataSetMock.getType).thenReturn(typeInfo)
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.registerDataSet(tableName, dataSetMock)
- functions.foreach(f => tEnv.registerFunction(f._1, f._2))
-
- // prepare RelBuilder
- val relBuilder = tEnv.getRelBuilder
- relBuilder.scan(tableName)
-
- (relBuilder, tEnv)
- }
-
- def testData: Any
-
- def typeInfo: TypeInformation[Any]
-
- def functions: Map[String, ScalarFunction] = Map()
-
- @Before
- def resetTestExprs() = {
- testExprs.clear()
- }
-
- @After
- def evaluateExprs() = {
- val relBuilder = context._1
- val config = new TableConfig()
- val generator = new CodeGenerator(config, false, typeInfo)
-
- // cast expressions to String
- val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq
-
- // generate code
- val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*)
- val genExpr = generator.generateResultExpression(
- resultType,
- resultType.getFieldNames,
- stringTestExprs)
-
- val bodyCode =
- s"""
- |${genExpr.code}
- |return ${genExpr.resultTerm};
- |""".stripMargin
-
- val genFunc = generator.generateFunction[MapFunction[Any, String]](
- "TestFunction",
- classOf[MapFunction[Any, String]],
- bodyCode,
- resultType.asInstanceOf[TypeInformation[Any]])
-
- // compile and evaluate
- val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
- val mapper = clazz.newInstance()
- val result = mapper.map(testData).asInstanceOf[Row]
-
- // compare
- testExprs
- .zipWithIndex
- .foreach {
- case ((expr, expected), index) =>
- val actual = result.getField(index)
- assertEquals(
- s"Wrong result for: $expr",
- expected,
- if (actual == null) "null" else actual)
- }
- }
-
- private def addSqlTestExpr(sqlExpr: String, expected: String): Unit = {
- // create RelNode from SQL expression
- val parsed = planner.parse(s"SELECT $sqlExpr FROM $tableName")
- val validated = planner.validate(parsed)
- val converted = planner.rel(validated).rel
-
- // create DataSetCalc
- val decorPlan = RelDecorrelator.decorrelateQuery(converted)
- val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
- val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps)
-
- // extract RexNode
- val calcProgram = dataSetCalc
- .asInstanceOf[DataSetCalc]
- .calcProgram
- val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
-
- testExprs += ((expanded, expected))
- }
-
- private def addTableApiTestExpr(tableApiExpr: Expression, expected: String): Unit = {
- // create RelNode from Table API expression
- val env = context._2
- val converted = env
- .asInstanceOf[BatchTableEnvironment]
- .scan(tableName)
- .select(tableApiExpr)
- .getRelNode
-
- // create DataSetCalc
- val decorPlan = RelDecorrelator.decorrelateQuery(converted)
- val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
- val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps)
-
- // extract RexNode
- val calcProgram = dataSetCalc
- .asInstanceOf[DataSetCalc]
- .calcProgram
- val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
-
- testExprs += ((expanded, expected))
- }
-
- private def addTableApiTestExpr(tableApiString: String, expected: String): Unit = {
- addTableApiTestExpr(ExpressionParser.parseExpression(tableApiString), expected)
- }
-
- def testAllApis(
- expr: Expression,
- exprString: String,
- sqlExpr: String,
- expected: String)
- : Unit = {
- addTableApiTestExpr(expr, expected)
- addTableApiTestExpr(exprString, expected)
- addSqlTestExpr(sqlExpr, expected)
- }
-
- def testTableApi(
- expr: Expression,
- exprString: String,
- expected: String)
- : Unit = {
- addTableApiTestExpr(expr, expected)
- addTableApiTestExpr(exprString, expected)
- }
-
- def testSqlApi(
- sqlExpr: String,
- expected: String)
- : Unit = {
- addSqlTestExpr(sqlExpr, expected)
- }
-
- // ----------------------------------------------------------------------------------------------
-
- // TestCompiler that uses current class loader
- class TestCompiler[T <: Function] extends Compiler[T] {
- def compile(genFunc: GeneratedFunction[T]): Class[T] =
- compile(getClass.getClassLoader, genFunc.name, genFunc.code)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedScalarFunctions.scala
deleted file mode 100644
index d4772cf..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.utils
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.Types
-import org.apache.flink.api.table.functions.ScalarFunction
-
-case class SimplePojo(name: String, age: Int)
-
-object Func0 extends ScalarFunction {
- def eval(index: Int): Int = {
- index
- }
-}
-
-object Func1 extends ScalarFunction {
- def eval(index: Integer): Integer = {
- index + 1
- }
-}
-
-object Func2 extends ScalarFunction {
- def eval(index: Integer, str: String, pojo: SimplePojo): String = {
- s"$index and $str and $pojo"
- }
-}
-
-object Func3 extends ScalarFunction {
- def eval(index: Integer, str: String): String = {
- s"$index and $str"
- }
-}
-
-object Func4 extends ScalarFunction {
- def eval(): Integer = {
- null
- }
-}
-
-object Func5 extends ScalarFunction {
- def eval(): Int = {
- -1
- }
-}
-
-object Func6 extends ScalarFunction {
- def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
- (date, time, timestamp)
- }
-}
-
-object Func7 extends ScalarFunction {
- def eval(a: Integer, b: Integer): Integer = {
- a + b
- }
-}
-
-object Func8 extends ScalarFunction {
- def eval(a: Int): String = {
- "a"
- }
-
- def eval(a: Int, b: Int): String = {
- "b"
- }
-
- def eval(a: String, b: String): String = {
- "c"
- }
-}
-
-object Func9 extends ScalarFunction {
- def eval(a: Int, b: Int, c: Long): String = {
- s"$a and $b and $c"
- }
-}
-
-object Func10 extends ScalarFunction {
- def eval(c: Long): Long = {
- c
- }
-
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
- Types.TIMESTAMP
- }
-}
-
-object Func11 extends ScalarFunction {
- def eval(a: Int, b: Long): String = {
- s"$a and $b"
- }
-}
-
-object Func12 extends ScalarFunction {
- def eval(a: Long): Long = {
- a
- }
-
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
- Types.INTERVAL_MILLIS
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/FieldProjectionTest.scala
deleted file mode 100644
index 1cefb8a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/FieldProjectionTest.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.expressions.{RowtimeAttribute, Upper, WindowReference}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.plan.FieldProjectionTest._
-import org.apache.flink.api.table.plan.logical.EventTimeTumblingGroupWindow
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
- * Tests for all the situations when we can do fields projection. Like selecting few fields
- * from a large field count source.
- */
-class FieldProjectionTest extends TableTestBase {
-
- val util = batchTestUtil()
-
- val streamUtil = streamTestUtil()
-
- @Test
- def testSimpleSelect(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.select('a, 'b)
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAllFields(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable1 = sourceTable.select('*)
- val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
-
- val expected = batchTableNode(0)
-
- util.verifyTable(resultTable1, expected)
- util.verifyTable(resultTable2, expected)
- }
-
- @Test
- def testSelectAggregation(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.select('a.sum, 'b.max)
-
- val expected = unaryNode(
- "DataSetAggregate",
- binaryNode(
- "DataSetUnion",
- values(
- "DataSetValues",
- tuples(List(null, null)),
- term("values", "a", "b")
- ),
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b")
- ),
- term("union", "a", "b")
- ),
- term("select", "SUM(a) AS TMP_0", "MAX(b) AS TMP_1")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFunction(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-
- util.tEnv.registerFunction("hashCode", MyHashCode)
-
- val resultTable = sourceTable.select("hashCode(c), b")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", s"${MyHashCode.getClass.getCanonicalName}(c) AS _c0", "b")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTable(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('a, 'c).select('a)
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("groupBy", "a", "c"),
- term("select", "a", "c")
- ),
- term("select", "a")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAllFieldsFromGroupedTable(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
-
- val expected = unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("groupBy", "a", "c"),
- term("select", "a", "c")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAggregationFromGroupedTable(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('c).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("groupBy", "c"),
- term("select", "c", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0 AS TMP_1")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c", "UPPER(c) AS k")
- ),
- term("groupBy", "k"),
- term("select", "k", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0 AS TMP_1")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTableWithFunctionKey(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c", s"${MyHashCode.getClass.getCanonicalName}(c) AS k")
- ),
- term("groupBy", "k"),
- term("select", "k", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0 AS TMP_1")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromStreamingWindow(): Unit = {
- val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable
- .window(Tumble over 5.millis on 'rowtime as 'w)
- .select(Upper('c).count, 'a.sum)
-
- val expected =
- unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "c", "a", "UPPER(c) AS $f2")
- ),
- term("window",
- EventTimeTumblingGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 5.millis)),
- term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1")
- )
-
- streamUtil.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromStreamingGroupedWindow(): Unit = {
- val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable
- .groupBy('b)
- .window(Tumble over 5.millis on 'rowtime as 'w)
- .select(Upper('c).count, 'a.sum, 'b)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "c", "a", "b", "UPPER(c) AS $f3")
- ),
- term("groupBy", "b"),
- term("window",
- EventTimeTumblingGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 5.millis)),
- term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
- ),
- term("select", "TMP_0 AS TMP_2", "TMP_1 AS TMP_3", "b")
- )
-
- streamUtil.verifyTable(resultTable, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testSelectFromBatchWindow1(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-
- // time field is selected
- val resultTable = sourceTable
- .window(Tumble over 5.millis on 'a as 'w)
- .select('a.sum, 'c.count)
-
- val expected = "TODO"
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testSelectFromBatchWindow2(): Unit = {
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-
- // time field is not selected
- val resultTable = sourceTable
- .window(Tumble over 5.millis on 'a as 'w)
- .select('c.count)
-
- val expected = "TODO"
-
- util.verifyTable(resultTable, expected)
- }
-}
-
-object FieldProjectionTest {
-
- object MyHashCode extends ScalarFunction {
- def eval(s: String): Int = s.hashCode()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
deleted file mode 100644
index 156f281..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.util
-
-import java.math.BigDecimal
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-
-import scala.collection.JavaConverters._
-import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
-import org.junit.{Assert, Before, Test}
-
-/**
- * This class is responsible for testing RexProgramProjectExtractor
- */
-class RexProgramProjectExtractorTest {
- private var typeFactory: JavaTypeFactory = null
- private var rexBuilder: RexBuilder = null
- private var allFieldTypes: Seq[RelDataType] = null
- private val allFieldNames = List("name", "id", "amount", "price")
-
- @Before
- def setUp: Unit = {
- typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
- rexBuilder = new RexBuilder(typeFactory)
- allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
- }
-
- @Test
- def testExtractRefInputFields: Unit = {
- val usedFields = extractRefInputFields(buildRexProgram)
- Assert.assertArrayEquals(usedFields, Array(2, 3, 1))
- }
-
- @Test
- def testRewriteRexProgram: Unit = {
- val originRexProgram = buildRexProgram
- Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
- "$0",
- "$1",
- "$2",
- "$3",
- "*($t2, $t3)",
- "100",
- "<($t4, $t5)",
- "6",
- ">($t1, $t7)",
- "AND($t6, $t8)")))
- // use amount, id, price fields to create a new RexProgram
- val usedFields = Array(2, 3, 1)
- val types = usedFields.map(allFieldTypes(_)).toList.asJava
- val names = usedFields.map(allFieldNames(_)).toList.asJava
- val inputRowType = typeFactory.createStructType(types, names)
- val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
- Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
- "$0",
- "$1",
- "$2",
- "*($t0, $t1)",
- "100",
- "<($t3, $t4)",
- "6",
- ">($t2, $t6)",
- "AND($t5, $t7)")))
- }
-
- private def buildRexProgram: RexProgram = {
- val types = allFieldTypes.asJava
- val names = allFieldNames.asJava
- val inputRowType = typeFactory.createStructType(types, names)
- val builder = new RexProgramBuilder(inputRowType, rexBuilder)
- val t0 = rexBuilder.makeInputRef(types.get(2), 2)
- val t1 = rexBuilder.makeInputRef(types.get(1), 1)
- val t2 = rexBuilder.makeInputRef(types.get(3), 3)
- val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
- val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
- val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
- // project: amount, amount * price
- builder.addProject(t0, "amount")
- builder.addProject(t3, "total")
- // condition: amount * price < 100 and id > 6
- val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
- val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
- val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
- builder.addCondition(t8)
- builder.getProgram
- }
-
- /**
- * extract all expression string list from input RexProgram expression lists
- *
- * @param rexProgram input RexProgram instance to analyze
- * @return all expression string list of input RexProgram expression lists
- */
- private def extractExprStrList(rexProgram: RexProgram) = {
- rexProgram.getExprList.asScala.map(_.toString)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
deleted file mode 100644
index 4e33a61..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.junit.Assert.assertEquals
-
-abstract class AggregateTestBase[T] {
-
- private val offset = 2
- private val rowArity: Int = offset + aggregator.intermediateDataType.length
-
- def inputValueSets: Seq[Seq[_]]
-
- def expectedResults: Seq[T]
-
- def aggregator: Aggregate[T]
-
- private def createAggregator(): Aggregate[T] = {
- val agg = aggregator
- agg.setAggOffsetInRow(offset)
- agg
- }
-
- private def createRow(): Row = {
- new Row(rowArity)
- }
-
- @Test
- def testAggregate(): Unit = {
-
- // iterate over input sets
- for((vals, expected) <- inputValueSets.zip(expectedResults)) {
-
- // prepare mapper
- val rows: Seq[Row] = prepare(vals)
-
- val result = if (aggregator.supportPartial) {
- // test with combiner
- val (firstVals, secondVals) = rows.splitAt(rows.length / 2)
- val combined = partialAgg(firstVals) :: partialAgg(secondVals) :: Nil
- finalAgg(combined)
-
- } else {
- // test without combiner
- finalAgg(rows)
- }
-
- (expected, result) match {
- case (e: BigDecimal, r: BigDecimal) =>
- // BigDecimal.equals() value and scale but we are only interested in value.
- assert(e.compareTo(r) == 0)
- case _ =>
- assertEquals(expected, result)
- }
- }
- }
-
- private def prepare(vals: Seq[_]): Seq[Row] = {
-
- val agg = createAggregator()
-
- vals.map { v =>
- val row = createRow()
- agg.prepare(v, row)
- row
- }
- }
-
- private def partialAgg(rows: Seq[Row]): Row = {
-
- val agg = createAggregator()
- val aggBuf = createRow()
-
- agg.initiate(aggBuf)
- rows.foreach(v => agg.merge(v, aggBuf))
-
- aggBuf
- }
-
- private def finalAgg(rows: Seq[Row]): T = {
-
- val agg = createAggregator()
- val aggBuf = createRow()
-
- agg.initiate(aggBuf)
- rows.foreach(v => agg.merge(v, aggBuf))
-
- agg.evaluate(partialAgg(rows))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
deleted file mode 100644
index 23b3054..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
- private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
- def minVal: T
- def maxVal: T
-
- override def inputValueSets: Seq[Seq[T]] = Seq(
- Seq(
- minVal,
- minVal,
- null.asInstanceOf[T],
- minVal,
- minVal,
- null.asInstanceOf[T],
- minVal,
- minVal,
- minVal
- ),
- Seq(
- maxVal,
- maxVal,
- null.asInstanceOf[T],
- maxVal,
- maxVal,
- null.asInstanceOf[T],
- maxVal,
- maxVal,
- maxVal
- ),
- Seq(
- minVal,
- maxVal,
- null.asInstanceOf[T],
- numeric.fromInt(0),
- numeric.negate(maxVal),
- numeric.negate(minVal),
- null.asInstanceOf[T]
- ),
- Seq(
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T]
- )
- )
-
- override def expectedResults: Seq[T] = Seq(
- minVal,
- maxVal,
- numeric.fromInt(0),
- null.asInstanceOf[T]
- )
-}
-
-class ByteAvgAggregateTest extends AvgAggregateTestBase[Byte] {
-
- override def minVal = (Byte.MinValue + 1).toByte
- override def maxVal = (Byte.MaxValue - 1).toByte
-
- override def aggregator = new ByteAvgAggregate()
-}
-
-class ShortAvgAggregateTest extends AvgAggregateTestBase[Short] {
-
- override def minVal = (Short.MinValue + 1).toShort
- override def maxVal = (Short.MaxValue - 1).toShort
-
- override def aggregator = new ShortAvgAggregate()
-}
-
-class IntAvgAggregateTest extends AvgAggregateTestBase[Int] {
-
- override def minVal = Int.MinValue + 1
- override def maxVal = Int.MaxValue - 1
-
- override def aggregator = new IntAvgAggregate()
-}
-
-class LongAvgAggregateTest extends AvgAggregateTestBase[Long] {
-
- override def minVal = Long.MinValue + 1
- override def maxVal = Long.MaxValue - 1
-
- override def aggregator = new LongAvgAggregate()
-}
-
-class FloatAvgAggregateTest extends AvgAggregateTestBase[Float] {
-
- override def minVal = Float.MinValue
- override def maxVal = Float.MaxValue
-
- override def aggregator = new FloatAvgAggregate()
-}
-
-class DoubleAvgAggregateTest extends AvgAggregateTestBase[Double] {
-
- override def minVal = Float.MinValue
- override def maxVal = Float.MaxValue
-
- override def aggregator = new DoubleAvgAggregate()
-}
-
-class DecimalAvgAggregateTest extends AggregateTestBase[BigDecimal] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq(
- new BigDecimal("987654321000000"),
- new BigDecimal("-0.000000000012345"),
- null,
- new BigDecimal("0.000000000012345"),
- new BigDecimal("-987654321000000"),
- null,
- new BigDecimal("0")
- ),
- Seq(
- null,
- null,
- null,
- null
- )
- )
-
- override def expectedResults: Seq[BigDecimal] = Seq(
- BigDecimal.ZERO,
- null
- )
-
- override def aggregator: Aggregate[BigDecimal] = new DecimalAvgAggregate()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
deleted file mode 100644
index 4389a3a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-class CountAggregateTest extends AggregateTestBase[Long] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq("a", "b", null, "c", null, "d", "e", null, "f"),
- Seq(null, null, null, null, null, null)
- )
-
- override def expectedResults: Seq[Long] = Seq(6L, 0L)
-
- override def aggregator: Aggregate[Long] = new CountAggregate()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
deleted file mode 100644
index aea3318..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
- private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
- def minVal: T
- def maxVal: T
-
- override def inputValueSets: Seq[Seq[T]] = Seq(
- Seq(
- numeric.fromInt(1),
- null.asInstanceOf[T],
- maxVal,
- numeric.fromInt(-99),
- numeric.fromInt(3),
- numeric.fromInt(56),
- numeric.fromInt(0),
- minVal,
- numeric.fromInt(-20),
- numeric.fromInt(17),
- null.asInstanceOf[T]
- ),
- Seq(
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T]
- )
- )
-
- override def expectedResults: Seq[T] = Seq(
- maxVal,
- null.asInstanceOf[T]
- )
-}
-
-class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] {
-
- override def minVal = (Byte.MinValue + 1).toByte
- override def maxVal = (Byte.MaxValue - 1).toByte
-
- override def aggregator: Aggregate[Byte] = new ByteMaxAggregate()
-}
-
-class ShortMaxAggregateTest extends MaxAggregateTestBase[Short] {
-
- override def minVal = (Short.MinValue + 1).toShort
- override def maxVal = (Short.MaxValue - 1).toShort
-
- override def aggregator: Aggregate[Short] = new ShortMaxAggregate()
-}
-
-class IntMaxAggregateTest extends MaxAggregateTestBase[Int] {
-
- override def minVal = Int.MinValue + 1
- override def maxVal = Int.MaxValue - 1
-
- override def aggregator: Aggregate[Int] = new IntMaxAggregate()
-}
-
-class LongMaxAggregateTest extends MaxAggregateTestBase[Long] {
-
- override def minVal = Long.MinValue + 1
- override def maxVal = Long.MaxValue - 1
-
- override def aggregator: Aggregate[Long] = new LongMaxAggregate()
-}
-
-class FloatMaxAggregateTest extends MaxAggregateTestBase[Float] {
-
- override def minVal = Float.MinValue / 2
- override def maxVal = Float.MaxValue / 2
-
- override def aggregator: Aggregate[Float] = new FloatMaxAggregate()
-}
-
-class DoubleMaxAggregateTest extends MaxAggregateTestBase[Double] {
-
- override def minVal = Double.MinValue / 2
- override def maxVal = Double.MaxValue / 2
-
- override def aggregator: Aggregate[Double] = new DoubleMaxAggregate()
-}
-
-class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] {
-
- override def inputValueSets: Seq[Seq[Boolean]] = Seq(
- Seq(
- false,
- false,
- false
- ),
- Seq(
- true,
- true,
- true
- ),
- Seq(
- true,
- false,
- null.asInstanceOf[Boolean],
- true,
- false,
- true,
- null.asInstanceOf[Boolean]
- ),
- Seq(
- null.asInstanceOf[Boolean],
- null.asInstanceOf[Boolean],
- null.asInstanceOf[Boolean]
- )
- )
-
- override def expectedResults: Seq[Boolean] = Seq(
- false,
- true,
- true,
- null.asInstanceOf[Boolean]
- )
-
- override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate()
-}
-
-class DecimalMaxAggregateTest extends AggregateTestBase[BigDecimal] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq(
- new BigDecimal("1"),
- new BigDecimal("1000.000001"),
- new BigDecimal("-1"),
- new BigDecimal("-999.998999"),
- null,
- new BigDecimal("0"),
- new BigDecimal("-999.999"),
- null,
- new BigDecimal("999.999")
- ),
- Seq(
- null,
- null,
- null,
- null,
- null
- )
- )
-
- override def expectedResults: Seq[BigDecimal] = Seq(
- new BigDecimal("1000.000001"),
- null
- )
-
- override def aggregator: Aggregate[BigDecimal] = new DecimalMaxAggregate()
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
deleted file mode 100644
index f007d02..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
- private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
- def minVal: T
- def maxVal: T
-
- override def inputValueSets: Seq[Seq[T]] = Seq(
- Seq(
- numeric.fromInt(1),
- null.asInstanceOf[T],
- maxVal,
- numeric.fromInt(-99),
- numeric.fromInt(3),
- numeric.fromInt(56),
- numeric.fromInt(0),
- minVal,
- numeric.fromInt(-20),
- numeric.fromInt(17),
- null.asInstanceOf[T]
- ),
- Seq(
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T]
- )
- )
-
- override def expectedResults: Seq[T] = Seq(
- minVal,
- null.asInstanceOf[T]
- )
-}
-
-class ByteMinAggregateTest extends MinAggregateTestBase[Byte] {
-
- override def minVal = (Byte.MinValue + 1).toByte
- override def maxVal = (Byte.MaxValue - 1).toByte
-
- override def aggregator: Aggregate[Byte] = new ByteMinAggregate()
-}
-
-class ShortMinAggregateTest extends MinAggregateTestBase[Short] {
-
- override def minVal = (Short.MinValue + 1).toShort
- override def maxVal = (Short.MaxValue - 1).toShort
-
- override def aggregator: Aggregate[Short] = new ShortMinAggregate()
-}
-
-class IntMinAggregateTest extends MinAggregateTestBase[Int] {
-
- override def minVal = Int.MinValue + 1
- override def maxVal = Int.MaxValue - 1
-
- override def aggregator: Aggregate[Int] = new IntMinAggregate()
-}
-
-class LongMinAggregateTest extends MinAggregateTestBase[Long] {
-
- override def minVal = Long.MinValue + 1
- override def maxVal = Long.MaxValue - 1
-
- override def aggregator: Aggregate[Long] = new LongMinAggregate()
-}
-
-class FloatMinAggregateTest extends MinAggregateTestBase[Float] {
-
- override def minVal = Float.MinValue / 2
- override def maxVal = Float.MaxValue / 2
-
- override def aggregator: Aggregate[Float] = new FloatMinAggregate()
-}
-
-class DoubleMinAggregateTest extends MinAggregateTestBase[Double] {
-
- override def minVal = Double.MinValue / 2
- override def maxVal = Double.MaxValue / 2
-
- override def aggregator: Aggregate[Double] = new DoubleMinAggregate()
-}
-
-class BooleanMinAggregateTest extends AggregateTestBase[Boolean] {
-
- override def inputValueSets: Seq[Seq[Boolean]] = Seq(
- Seq(
- false,
- false,
- false
- ),
- Seq(
- true,
- true,
- true
- ),
- Seq(
- true,
- false,
- null.asInstanceOf[Boolean],
- true,
- false,
- true,
- null.asInstanceOf[Boolean]
- ),
- Seq(
- null.asInstanceOf[Boolean],
- null.asInstanceOf[Boolean],
- null.asInstanceOf[Boolean]
- )
- )
-
- override def expectedResults: Seq[Boolean] = Seq(
- false,
- true,
- false,
- null.asInstanceOf[Boolean]
- )
-
- override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate()
-}
-
-class DecimalMinAggregateTest extends AggregateTestBase[BigDecimal] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq(
- new BigDecimal("1"),
- new BigDecimal("1000"),
- new BigDecimal("-1"),
- new BigDecimal("-999.998999"),
- null,
- new BigDecimal("0"),
- new BigDecimal("-999.999"),
- null,
- new BigDecimal("999.999")
- ),
- Seq(
- null,
- null,
- null,
- null,
- null
- )
- )
-
- override def expectedResults: Seq[BigDecimal] = Seq(
- new BigDecimal("-999.999"),
- null
- )
-
- override def aggregator: Aggregate[BigDecimal] = new DecimalMinAggregate()
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
deleted file mode 100644
index 7e4e47b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
- private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
- def maxVal: T
- private val minVal = numeric.negate(maxVal)
-
- override def inputValueSets: Seq[Seq[T]] = Seq(
- Seq(
- minVal,
- numeric.fromInt(1),
- null.asInstanceOf[T],
- numeric.fromInt(2),
- numeric.fromInt(3),
- numeric.fromInt(4),
- numeric.fromInt(5),
- numeric.fromInt(-10),
- numeric.fromInt(-20),
- numeric.fromInt(17),
- null.asInstanceOf[T],
- maxVal
- ),
- Seq(
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T]
- )
- )
-
- override def expectedResults: Seq[T] = Seq(
- numeric.fromInt(2),
- null.asInstanceOf[T]
- )
-}
-
-class ByteSumAggregateTest extends SumAggregateTestBase[Byte] {
-
- override def maxVal = (Byte.MaxValue / 2).toByte
-
- override def aggregator: Aggregate[Byte] = new ByteSumAggregate
-}
-
-class ShortSumAggregateTest extends SumAggregateTestBase[Short] {
-
- override def maxVal = (Short.MaxValue / 2).toShort
-
- override def aggregator: Aggregate[Short] = new ShortSumAggregate
-}
-
-class IntSumAggregateTest extends SumAggregateTestBase[Int] {
-
- override def maxVal = Int.MaxValue / 2
-
- override def aggregator: Aggregate[Int] = new IntSumAggregate
-}
-
-class LongSumAggregateTest extends SumAggregateTestBase[Long] {
-
- override def maxVal = Long.MaxValue / 2
-
- override def aggregator: Aggregate[Long] = new LongSumAggregate
-}
-
-class FloatSumAggregateTest extends SumAggregateTestBase[Float] {
-
- override def maxVal = 12345.6789f
-
- override def aggregator: Aggregate[Float] = new FloatSumAggregate
-}
-
-class DoubleSumAggregateTest extends SumAggregateTestBase[Double] {
-
- override def maxVal = 12345.6789d
-
- override def aggregator: Aggregate[Double] = new DoubleSumAggregate
-}
-
-class DecimalSumAggregateTest extends AggregateTestBase[BigDecimal] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq(
- new BigDecimal("1"),
- new BigDecimal("2"),
- new BigDecimal("3"),
- null,
- new BigDecimal("0"),
- new BigDecimal("-1000"),
- new BigDecimal("0.000000000002"),
- new BigDecimal("1000"),
- new BigDecimal("-0.000000000001"),
- new BigDecimal("999.999"),
- null,
- new BigDecimal("4"),
- new BigDecimal("-999.999"),
- null
- ),
- Seq(
- null,
- null,
- null,
- null,
- null
- )
- )
-
- override def expectedResults: Seq[BigDecimal] = Seq(
- new BigDecimal("10.000000000001"),
- null
- )
-
- override def aggregator: Aggregate[BigDecimal] = new DecimalSumAggregate()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
deleted file mode 100644
index 993347f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.dataset
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.utils._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class DataSetCorrelateITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testCrossJoin(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val func1 = new TableFunc1
- val result = in.join(func1('c) as 's).select('c, 's).toDataSet[Row]
- val results = result.collect()
- val expected = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
- "Anna#44,Anna\n" + "Anna#44,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
-
- // with overloading
- val result2 = in.join(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
- val results2 = result2.collect()
- val expected2 = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
- "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
- TestBaseUtils.compareResultAsText(results2.asJava, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val func2 = new TableFunc2
- val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
- val results = result.collect()
- val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
- "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testWithFilter(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
- val func0 = new TableFunc0
-
- val result = in
- .join(func0('c) as ('name, 'age))
- .select('c, 'name, 'age)
- .filter('age > 20)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testCustomReturnType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
- val func2 = new TableFunc2
-
- val result = in
- .join(func2('c) as ('name, 'len))
- .select('c, 'name, 'len)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
- "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testHierarchyType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val hierarchy = new HierarchyTableFunction
- val result = in
- .join(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'adult, 'len)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected = "Jack#22,Jack,true,22\n" + "John#19,John,false,19\n" +
- "Anna#44,Anna,true,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testPojoType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val pojo = new PojoTableFunc()
- val result = in
- .join(pojo('c))
- .select('c, 'name, 'age)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUDTFWithScalarFunction(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
- val func1 = new TableFunc1
-
- val result = in
- .join(func1('c.substring(2)) as 's)
- .select('c, 's)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected = "Jack#22,ack\n" + "Jack#22,22\n" + "John#19,ohn\n" + "John#19,19\n" +
- "Anna#44,nna\n" + "Anna#44,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- private def testData(
- env: ExecutionEnvironment)
- : DataSet[(Int, Long, String)] = {
-
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Jack#22"))
- data.+=((2, 2L, "John#19"))
- data.+=((3, 2L, "Anna#44"))
- data.+=((4, 3L, "nosharp"))
- env.fromCollection(data)
- }
-}