You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/30 13:20:48 UTC
[2/4] flink git commit: [FLINK-7548] [table] Refactor TableSource
interface.
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 6321e09..4b88bc3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -131,7 +131,9 @@ class TableSourceTest extends TableTestBase {
val expected = unaryNode(
"DataSetCalc",
- batchSourceTableNode(tableName, Array("first")),
+ s"BatchTableSourceScan(table=[[$tableName]], " +
+ s"fields=[], " +
+ s"source=[CsvTableSource(read fields: first)])",
term("select", "1 AS _c0")
)
@@ -153,9 +155,7 @@ class TableSourceTest extends TableTestBase {
val expected = unaryNode(
"DataSetCalc",
- batchSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price")),
+ "BatchTableSourceScan(table=[[filterableTable]], fields=[price, id, amount])",
term("select", "price", "id", "amount"),
term("where", "<(*(price, 2), 32)")
)
@@ -180,7 +180,7 @@ class TableSourceTest extends TableTestBase {
"DataSetCalc",
batchFilterableSourceTableNode(
tableName,
- Array("name", "id", "amount", "price"),
+ Array("price", "name", "amount"),
"'amount > 2"),
term("select", "price", "LOWER(name) AS _c1", "amount"),
term("where", "<(*(price, 2), 32)")
@@ -201,14 +201,10 @@ class TableSourceTest extends TableTestBase {
.select('price, 'id, 'amount)
.where("amount > 2 && amount < 32")
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2 && 'amount < 32"),
- term("select", "price", "id", "amount")
- )
+ val expected = batchFilterableSourceTableNode(
+ tableName,
+ Array("price", "id", "amount"),
+ "'amount > 2 && 'amount < 32")
util.verifyTable(result, expected)
}
@@ -229,7 +225,7 @@ class TableSourceTest extends TableTestBase {
"DataSetCalc",
batchFilterableSourceTableNode(
tableName,
- Array("name", "id", "amount", "price"),
+ Array("price", "id", "amount"),
"'amount > 2"),
term("select", "price", "id", "amount"),
term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
@@ -256,7 +252,7 @@ class TableSourceTest extends TableTestBase {
"DataSetCalc",
batchFilterableSourceTableNode(
tableName,
- Array("name", "id", "amount", "price"),
+ Array("price", "id", "amount"),
"'amount > 2"),
term("select", "price", "id", "amount"),
term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)")
@@ -339,7 +335,7 @@ class TableSourceTest extends TableTestBase {
"DataStreamCalc",
streamFilterableSourceTableNode(
tableName,
- Array("name", "id", "amount", "price"),
+ Array("price", "id", "amount"),
"'amount > 2"),
term("select", "price", "id", "amount"),
term("where", "<(*(price, 2), 32)")
@@ -392,11 +388,15 @@ class TableSourceTest extends TableTestBase {
}
def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
- s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ s"BatchTableSourceScan(table=[[$sourceName]], " +
+ s"fields=[${fields.mkString(", ")}], " +
+ s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
}
def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
- s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ s"StreamTableSourceScan(table=[[$sourceName]], " +
+ s"fields=[${fields.mkString(", ")}], " +
+ s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
}
def batchFilterableSourceTableNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index be073bd..c53f5ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -20,78 +20,74 @@ package org.apache.flink.table.api.stream.table
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableSchema, Types}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources._
import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.{TableTestBase, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSourceWithTime}
import org.apache.flink.types.Row
-import org.junit.{Assert, Test}
+import org.junit.Test
class TableSourceTest extends TableTestBase {
@Test
def testTableSourceWithLongRowTimeField(): Unit = {
- val tableSource = new TestRowtimeSource(
+ val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ val returnType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
- "rowtime"
- )
+ Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.registerTableSource("rowTimeT", tableSource)
+ util.tableEnv.registerTableSource(
+ "rowTimeT",
+ new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
- val expected =
- unaryNode(
- "DataStreamCalc",
- "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
- term("select", "rowtime", "id", "name", "val")
- )
+ val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])"
util.verifyTable(t, expected)
}
@Test
def testTableSourceWithTimestampRowTimeField(): Unit = {
- val tableSource = new TestRowtimeSource(
+ val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ val returnType = new RowTypeInfo(
Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
- "rowtime"
- )
+ Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.registerTableSource("rowTimeT", tableSource)
+ util.tableEnv.registerTableSource(
+ "rowTimeT",
+ new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
- val expected =
- unaryNode(
- "DataStreamCalc",
- "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
- term("select", "rowtime", "id", "name", "val")
- )
+ val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])"
util.verifyTable(t, expected)
}
@Test
def testRowTimeTableSourceGroupWindow(): Unit = {
- val tableSource = new TestRowtimeSource(
+ val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
.asInstanceOf[Array[TypeInformation[_]]],
- "rowtime"
- )
+ Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.registerTableSource("rowTimeT", tableSource)
+ util.tableEnv.registerTableSource(
+ "rowTimeT",
+ new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
val t = util.tableEnv.scan("rowTimeT")
.filter("val > 100")
@@ -106,8 +102,8 @@ class TableSourceTest extends TableTestBase {
"DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
- "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
- term("select", "name", "val", "rowtime"),
+ "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, val, name])",
+ term("select", "rowtime", "val", "name"),
term("where", ">(val, 100)")
),
term("groupBy", "name"),
@@ -121,27 +117,47 @@ class TableSourceTest extends TableTestBase {
@Test
def testProcTimeTableSourceSimple(): Unit = {
+
+ val tableSchema = new TableSchema(
+ Array("id", "proctime", "val", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "val", "name"))
+
val util = streamTestUtil()
- util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+ util.tableEnv.registerTableSource(
+ "procTimeT",
+ new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
- val t = util.tableEnv.scan("procTimeT").select("pTime, id, name, val")
+ val t = util.tableEnv.scan("procTimeT").select("proctime, id, name, val")
val expected =
unaryNode(
"DataStreamCalc",
- "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
- term("select", "PROCTIME(pTime) AS pTime", "id", "name", "val")
+ "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])",
+ term("select", "PROCTIME(proctime) AS proctime", "id", "name", "val")
)
util.verifyTable(t, expected)
}
@Test
def testProcTimeTableSourceOverWindow(): Unit = {
+
+ val tableSchema = new TableSchema(
+ Array("id", "proctime", "val", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "val", "name"))
+
val util = streamTestUtil()
- util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+ util.tableEnv.registerTableSource(
+ "procTimeT",
+ new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
val t = util.tableEnv.scan("procTimeT")
- .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+ .window(Over partitionBy 'id orderBy 'proctime preceding 2.hours as 'w)
.select('id, 'name, 'val.sum over 'w as 'valSum)
.filter('valSum > 100)
@@ -150,11 +166,11 @@ class TableSourceTest extends TableTestBase {
"DataStreamCalc",
unaryNode(
"DataStreamOverAggregate",
- "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+ "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])",
term("partitionBy", "id"),
- term("orderBy", "pTime"),
+ term("orderBy", "proctime"),
term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
- term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+ term("select", "id", "proctime", "val", "name", "SUM(val) AS w0$o0")
),
term("select", "id", "name", "w0$o0 AS valSum"),
term("where", ">(w0$o0, 100)")
@@ -163,94 +179,195 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProjectableProcTimeTableSource(): Unit = {
- // ensures that projection is not pushed into table source with proctime indicators
- val util = streamTestUtil()
+ def testProjectWithRowtimeProctime(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "val", "rtime"))
- val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] {
- override def projectFields(fields: Array[Int]): TableSource[Row] = {
- // ensure this method is not called!
- Assert.fail()
- null.asInstanceOf[TableSource[Row]]
- }
- }
- util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
- val t = util.tableEnv.scan("PTimeTable")
- .select('name, 'val)
- .where('val > 10)
+ val t = util.tableEnv.scan("T").select('name, 'val, 'id)
- val expected =
- unaryNode(
- "DataStreamCalc",
- "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])",
- term("select", "name", "val"),
- term("where", ">(val, 10)")
- )
+ val expected = "StreamTableSourceScan(table=[[T]], " +
+ "fields=[name, val, id], " +
+ "source=[TestSource(physical fields: name, val, id)])"
util.verifyTable(t, expected)
}
@Test
- def testProjectableRowTimeTableSource(): Unit = {
- // ensures that projection is not pushed into table source with rowtime indicators
+ def testProjectWithoutRowtime(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "val", "rtime"))
+
val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
+
+ val t = util.tableEnv.scan("T").select('ptime, 'name, 'val, 'id)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[T]], " +
+ "fields=[ptime, name, val, id], " +
+ "source=[TestSource(physical fields: name, val, id)])",
+ term("select", "PROCTIME(ptime) AS ptime", "name", "val", "id")
+ )
+ util.verifyTable(t, expected)
+ }
- val projectableTableSource = new TestRowtimeSource(
- Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
- .asInstanceOf[Array[TypeInformation[_]]],
- "rowtime") with ProjectableTableSource[Row] {
+ def testProjectWithoutProctime(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "rtime", "val", "name"))
- override def projectFields(fields: Array[Int]): TableSource[Row] = {
- // ensure this method is not called!
- Assert.fail()
- null.asInstanceOf[TableSource[Row]]
- }
- }
- util.tableEnv.registerTableSource("RTimeTable", projectableTableSource)
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
- val t = util.tableEnv.scan("RTimeTable")
- .select('name, 'val)
- .where('val > 10)
+ val t = util.tableEnv.scan("T").select('name, 'val, 'rtime, 'id)
- val expected =
- unaryNode(
- "DataStreamCalc",
- "StreamTableSourceScan(table=[[RTimeTable]], fields=[id, rowtime, val, name])",
- term("select", "name", "val"),
- term("where", ">(val, 10)")
- )
+ val expected = "StreamTableSourceScan(table=[[T]], " +
+ "fields=[name, val, rtime, id], " +
+ "source=[TestSource(physical fields: name, val, rtime, id)])"
util.verifyTable(t, expected)
}
-}
-class TestRowtimeSource(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]],
- rowtimeField: String)
- extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+ def testProjectOnlyProctime(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "rtime", "val", "name"))
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
- override def getRowtimeAttribute: String = rowtimeField
+ val t = util.tableEnv.scan("T").select('ptime)
- override def getReturnType: TypeInformation[Row] = {
- new RowTypeInfo(fieldTypes, fieldNames)
+ val expected = "StreamTableSourceScan(table=[[T]], " +
+ "fields=[ptime], " +
+ "source=[TestSource(physical fields: )])"
+ util.verifyTable(t, expected)
}
-}
-class TestProctimeSource(timeField: String)
- extends StreamTableSource[Row] with DefinedProctimeAttribute {
+ def testProjectOnlyRowtime(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "rtime", "val", "name"))
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
- override def getProctimeAttribute: String = timeField
+ val t = util.tableEnv.scan("T").select('rtime)
- override def getReturnType: TypeInformation[Row] = {
- new RowTypeInfo(
- Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
- Array("id", "val", "name"))
+ val expected = "StreamTableSourceScan(table=[[T]], " +
+ "fields=[rtime], " +
+ "source=[TestSource(physical fields: rtime)])"
+ util.verifyTable(t, expected)
}
-}
+ @Test
+ def testProjectWithMapping(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("p-rtime", "p-id", "p-name", "p-val"))
+ val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
+
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
+
+ val t = util.tableEnv.scan("T").select('name, 'rtime, 'val)
+ val expected = "StreamTableSourceScan(table=[[T]], " +
+ "fields=[name, rtime, val], " +
+ "source=[TestSource(physical fields: remapped-p-name, remapped-p-rtime, remapped-p-val)])"
+ util.verifyTable(t, expected)
+ }
+
+ @Test
+ def testNestedProject(): Unit = {
+
+ val nested1 = new RowTypeInfo(
+ Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ Array("name", "value")
+ )
+
+ val nested2 = new RowTypeInfo(
+ Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]],
+ Array("num", "flag")
+ )
+
+ val deepNested = new RowTypeInfo(
+ Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]],
+ Array("nested1", "nested2")
+ )
+
+ val tableSchema = new TableSchema(
+ Array("id", "deepNested", "nested", "name"),
+ Array(Types.INT, deepNested, nested1, Types.STRING))
+
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "deepNested", "nested", "name"))
+
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource(
+ "T",
+ new TestNestedProjectableTableSource(tableSchema, returnType, Seq()))
+
+ val t = util.tableEnv
+ .scan("T")
+ .select('id,
+ 'deepNested.get("nested1").get("name") as 'nestedName,
+ 'nested.get("value") as 'nestedValue,
+ 'deepNested.get("nested2").get("flag") as 'nestedFlag,
+ 'deepNested.get("nested2").get("num") as 'nestedNum)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[T]], " +
+ "fields=[id, deepNested, nested], " +
+ "source=[TestSource(read nested fields: " +
+ "id.*, deepNested.nested2.num, deepNested.nested2.flag, " +
+ "deepNested.nested1.name, nested.value)])",
+ term("select", "id", "deepNested.nested1.name AS nestedName", "nested.value AS nestedValue",
+ "deepNested.nested2.flag AS nestedFlag", "deepNested.nested2.num AS nestedNum")
+ )
+ util.verifyTable(t, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
deleted file mode 100644
index 80f1725..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.stream.table.validation
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableException, Types}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.stream.table.{TestProctimeSource, TestRowtimeSource}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class TableSourceValidationTest extends TableTestBase {
-
- @Test(expected = classOf[TableException])
- def testRowtimeTableSourceWithEmptyName(): Unit = {
-
- val tableSource = new TestRowtimeSource(
- Array("id", "rowtime", "val", "name"),
- Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
- .asInstanceOf[Array[TypeInformation[_]]],
- "rowtime"
- )
-
- val util = streamTestUtil()
- util.tableEnv.registerTableSource("rowTime", tableSource)
-
- val t = util.tableEnv.scan("rowTimeT")
- .select('id)
-
- util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false)
- }
-
- @Test(expected = classOf[TableException])
- def testProctimeTableSourceWithEmptyName(): Unit = {
- val util = streamTestUtil()
- util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
-
- val t = util.tableEnv.scan("procTimeT")
- .select('id)
-
- util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
deleted file mode 100644
index a845f5c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class FlinkTableValidationTest extends TableTestBase {
-
- @Test
- def testFieldNamesDuplicate() {
-
- thrown.expect(classOf[TableException])
- thrown.expectMessage("Field names must be unique.\n" +
- "List of duplicate fields: [a].\n" +
- "List of all fields: [a, a, b].")
-
- val util = batchTestUtil()
- util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
new file mode 100644
index 0000000..92cae5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class InlineTableValidationTest extends TableTestBase {
+
+ @Test
+ def testFieldNamesDuplicate() {
+
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("Field names must be unique.\n" +
+ "List of duplicate fields: [a].\n" +
+ "List of all fields: [a, a, b].")
+
+ val util = batchTestUtil()
+ util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index 09a9c55..4828e86 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -18,124 +18,228 @@
package org.apache.flink.table.api.validation
+import java.util
+import java.util.Collections
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
-import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types, ValidationException}
+import org.apache.flink.table.sources._
import org.apache.flink.table.utils.TestTableSourceWithTime
import org.apache.flink.types.Row
import org.junit.Test
class TableSourceValidationTest {
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithNullPath(): Unit = {
- CsvTableSource.builder()
- .field("myfield", Types.STRING)
- // should fail, path is not defined
- .build()
- }
+ @Test(expected = classOf[ValidationException])
+ def testUnresolvedSchemaField(): Unit = {
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- .field("myfield", Types.STRING)
- // should fail, field name must no be duplicate
- .field("myfield", Types.INT)
- }
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithEmptyField(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- // should fail, field can be empty
- .build()
+ val schema = new TableSchema(
+ Array("id", "name", "amount", "value"),
+ Array(Types.LONG, Types.STRING, Types.INT, Types.DOUBLE))
+ val rowType = new RowTypeInfo(
+ Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "amount"))
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]())
+
+ // should fail because schema field "value" cannot be resolved in result type
+ tEnv.registerTableSource("testTable", ts)
}
- @Test(expected = classOf[TableException])
- def testNonExistingRowtimeField(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testNonMatchingFieldTypes(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val schema = new TableSchema(
+ Array("id", "name", "amount"),
+ Array(Types.LONG, Types.INT, Types.INT))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
- Array("id", "name", "amount")
- )
- val ts = new TestTableSourceWithTime(
- Seq[Row](),
- rowType,
- "rTime",
- null
- )
+ Array("id", "name", "amount"))
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]())
+
+ // should fail because types of "name" fields are different
+ tEnv.registerTableSource("testTable", ts)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMappingToUnknownField(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- // should fail because configured rowtime field is not in schema
+ val schema = new TableSchema(
+ Array("id", "name", "amount"),
+ Array(Types.LONG, Types.STRING, Types.DOUBLE))
+ val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.DOUBLE)
+ val mapping = Map("id" -> "f3", "name" -> "f1", "amount" -> "f2")
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping)
+
+ // should fail because mapping maps field "id" to unknown field
tEnv.registerTableSource("testTable", ts)
}
- @Test(expected = classOf[TableException])
- def testInvalidTypeRowtimeField(): Unit = {
- val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
- Array("id", "name", "amount")
- )
- val ts = new TestTableSourceWithTime(
- Seq[Row](),
- rowType,
- "name",
- null
- )
+ @Test(expected = classOf[ValidationException])
+ def testMappingWithInvalidFieldType(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- // should fail because configured rowtime field is not of type Long or Timestamp
+ val schema = new TableSchema(
+ Array("id", "name", "amount"),
+ Array(Types.LONG, Types.STRING, Types.DOUBLE))
+ val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.INT)
+ val mapping = Map("id" -> "f0", "name" -> "f1", "amount" -> "f2")
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping)
+
+ // should fail because mapping maps fields with different types
tEnv.registerTableSource("testTable", ts)
}
- @Test(expected = classOf[TableException])
- def testEmptyRowtimeField(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testNonTimestampProctimeField(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val schema = new TableSchema(
+ Array("id", "name", "amount", "ptime"),
+ Array(Types.LONG, Types.STRING, Types.INT, Types.LONG))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
- Array("id", "name", "amount")
- )
- val ts = new TestTableSourceWithTime(
- Seq[Row](),
- rowType,
- "",
- null
- )
+ Array("id", "name", "amount"))
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), proctime = "ptime")
+
+ // should fail because processing time field has invalid type
+ tEnv.registerTableSource("testTable", ts)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testNonTimestampRowtimeField(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- // should fail because configured rowtime field is empty
+ val schema = new TableSchema(
+ Array("id", "name", "amount", "rtime"),
+ Array(Types.LONG, Types.STRING, Types.INT, Types.LONG))
+ val rowType = new RowTypeInfo(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "rtime", "amount"))
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "rtime")
+
+ // should fail because rowtime field has invalid type
tEnv.registerTableSource("testTable", ts)
}
- @Test(expected = classOf[TableException])
- def testEmptyProctimeField(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testFieldRowtimeAndProctime(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val schema = new TableSchema(
+ Array("id", "name", "amount", "time"),
+ Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP))
val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
- Array("id", "name", "amount")
- )
- val ts = new TestTableSourceWithTime(
- Seq[Row](),
- rowType,
- null,
- ""
- )
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "time", "amount"))
+ val ts =
+ new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "time", proctime = "time")
+
+ // should fail because rowtime field has invalid type
+ tEnv.registerTableSource("testTable", ts)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnknownTimestampExtractorArgField(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val schema = new TableSchema(
+ Array("id", "name", "amount", "rtime"),
+ Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP))
+ val rowType = new RowTypeInfo(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "rtime", "amount"))
+ val ts =
+ new TestTableSourceWithTime(schema, rowType, Seq[Row]()) {
+
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+ Collections.singletonList(new RowtimeAttributeDescriptor(
+ "rtime",
+ new ExistingField("doesNotExist"),
+ new AscendingWatermarks))
+ }
+ }
+
+ // should fail because timestamp extractor argument field does not exist
+ tEnv.registerTableSource("testTable", ts)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFailingTimestampExtractorValidation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- // should fail because configured proctime field is empty
+ val fieldNames = Array("id", "name", "amount")
+ val rowType = new RowTypeInfo(
+ Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+ val schema = new TableSchema(
+ fieldNames,
+ Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT))
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name")
+
+ // should fail because configured rowtime field is not of type Long or Timestamp
tEnv.registerTableSource("testTable", ts)
}
+
+ // CsvTableSource Tests
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithNullPath(): Unit = {
+ CsvTableSource.builder()
+ .field("myfield", Types.STRING)
+ // should fail, path is not defined
+ .build()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+ CsvTableSource.builder()
+ .path("/path/to/csv")
+ .field("myfield", Types.STRING)
+ // should fail, field name must no be duplicate
+ .field("myfield", Types.INT)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+ CsvTableSource.builder()
+ .path("/path/to/csv")
+ // should fail, field can be empty
+ .build()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
index 82bfd8d..0744de9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.plan.schema.{StreamTableSourceTable}
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row
import org.junit.Assert.assertTrue
@@ -58,6 +58,9 @@ class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Ro
val schema = externalCatalogTable.schema
Types.ROW(schema.getColumnNames, schema.getTypes)
}
+
+ override def getTableSchema: TableSchema = externalCatalogTable.schema
+
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 6d1d66f..54d1510 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.catalog
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.api._
import org.junit.{Before, Test}
import org.junit.Assert._
@@ -68,7 +68,7 @@ class InMemoryExternalCatalogTest {
val table = createTableInstance()
catalog.createTable(tableName, table, ignoreIfExists = false)
assertEquals(catalog.getTable(tableName), table)
- val newTable = createTableInstance()
+ val newTable = createTableInstance(Array("number"), Array(Types.INT))
catalog.alterTable(tableName, newTable, ignoreIfNotExists = false)
val currentTable = catalog.getTable(tableName)
// validate the table is really replaced after alter table
@@ -142,4 +142,11 @@ class InMemoryExternalCatalogTest {
)
ExternalCatalogTable("csv", schema)
}
+
+ private def createTableInstance(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): ExternalCatalogTable = {
+ val schema = new TableSchema(fieldNames, fieldTypes)
+ ExternalCatalogTable("csv", schema)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index 5e214b1..2292e17 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -18,13 +18,22 @@
package org.apache.flink.table.runtime.batch.table
+import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
+
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.{ExecutionEnvironment => JExecEnv}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase}
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.TestFilterableTableSource
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.utils._
import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -36,6 +45,33 @@ class TableSourceITCase(
configMode: TableConfigMode)
extends TableProgramsCollectionTestBase(configMode) {
+ @Test(expected = classOf[TableException])
+ def testInvalidDatastreamType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val tableSource = new BatchTableSource[Row]() {
+ private val fieldNames: Array[String] = Array("name", "id", "value")
+ private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+ .asInstanceOf[Array[TypeInformation[_]]]
+
+ override def getDataSet(execEnv: JExecEnv): DataSet[Row] = {
+ val data = List(Row.of("Mary", new JLong(1L), new JInt(1))).asJava
+ // return DataSet[Row] with GenericTypeInfo
+ execEnv.fromCollection(data, new GenericTypeInfo[Row](classOf[Row]))
+ }
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+ override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+ }
+ tEnv.registerTableSource("T", tableSource)
+
+ tEnv.scan("T")
+ .select('value, 'name)
+ .collect()
+
+ // test should fail because type info of returned DataSet does not match type return type info.
+ }
+
@Test
def testCsvTableSourceWithProjection(): Unit = {
val csvTable = CommonTestData.getCsvTableSource
@@ -76,4 +112,535 @@ class TableSourceITCase(
"5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testRowtimeRowTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of("Mary", new JLong(1L), new JInt(10)),
+ Row.of("Bob", new JLong(2L), new JInt(20)),
+ Row.of("Mary", new JLong(2L), new JInt(30)),
+ Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+ val fieldNames = Array("name", "rtime", "amount")
+ val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+ val rowType = new RowTypeInfo(
+ Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.start, 'amount.sum)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1970-01-01 00:00:00.0,40",
+ "Bob,1970-01-01 00:00:00.0,20",
+ "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProctimeRowTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of("Mary", new JLong(1L), new JInt(10)),
+ Row.of("Bob", new JLong(2L), new JInt(20)),
+ Row.of("Mary", new JLong(2L), new JInt(30)),
+ Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+ val fieldNames = Array("name", "rtime", "amount")
+ val schema = new TableSchema(
+ fieldNames :+ "ptime",
+ Array(Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP))
+ val rowType = new RowTypeInfo(
+ Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestTableSourceWithTime(schema, rowType, data, null, "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .where('ptime.cast(Types.LONG) > 0L)
+ .select('name, 'amount)
+ .collect()
+
+ val expected = Seq(
+ "Mary,10",
+ "Bob,20",
+ "Mary,30",
+ "Liz,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRowtimeProctimeRowTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of("Mary", new JLong(1L), new JInt(10)),
+ Row.of("Bob", new JLong(2L), new JInt(20)),
+ Row.of("Mary", new JLong(2L), new JInt(30)),
+ Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+ val fieldNames = Array("name", "rtime", "amount")
+ val schema = new TableSchema(
+ fieldNames :+ "ptime",
+ Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT, Types.SQL_TIMESTAMP))
+ val rowType = new RowTypeInfo(
+ Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.start, 'amount.sum)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1970-01-01 00:00:00.0,40",
+ "Bob,1970-01-01 00:00:00.0,20",
+ "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRowtimeAsTimestampRowTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of("Mary", toTimestamp(1L), new JInt(10)),
+ Row.of("Bob", toTimestamp(2L), new JInt(20)),
+ Row.of("Mary", toTimestamp(2L), new JInt(30)),
+ Row.of("Liz", toTimestamp(2001L), new JInt(40)))
+
+ val fieldNames = Array("name", "rtime", "amount")
+ val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+ val rowType = new RowTypeInfo(
+ Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.start, 'amount.sum)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1970-01-01 00:00:00.0,40",
+ "Bob,1970-01-01 00:00:00.0,20",
+ "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRowtimeLongTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L))
+
+ val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
+ val returnType = Types.LONG
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('w)
+ .select('w.start, 'rtime.count)
+ .collect()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.0,3",
+ "1970-01-01 00:00:02.0,1",
+ "1970-01-01 00:00:04.0,1").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProctimeStringTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq("Mary", "Peter", "Bob", "Liz")
+
+ val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+ val returnType = Types.STRING
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .where('ptime.cast(Types.LONG) > 1)
+ .select('name)
+ .collect()
+
+ val expected = Seq("Mary", "Peter", "Bob", "Liz").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRowtimeProctimeLongTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L))
+
+ val schema = new TableSchema(
+ Array("rtime", "ptime"),
+ Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+ val returnType = Types.LONG
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val results = tEnv.scan(tableName)
+ .where('ptime.cast(Types.LONG) > 1)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('w)
+ .select('w.start, 'rtime.count)
+ .collect()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.0,3",
+ "1970-01-01 00:00:02.0,1",
+ "1970-01-01 00:00:04.0,1").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFieldMappingTableSource(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of("Mary", new JLong(1L), new JInt(10)),
+ Row.of("Bob", new JLong(2L), new JInt(20)),
+ Row.of("Mary", new JLong(2L), new JInt(30)),
+ Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+ val schema = new TableSchema(
+ Array("ptime", "amount", "name", "rtime"),
+ Array(Types.SQL_TIMESTAMP, Types.INT, Types.STRING, Types.SQL_TIMESTAMP))
+ val returnType = new RowTypeInfo(Types.STRING, Types.LONG, Types.INT)
+ val mapping = Map("amount" -> "f2", "name" -> "f0", "rtime" -> "f1")
+
+ val source = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime", mapping)
+ tEnv.registerTableSource(tableName, source)
+
+ val results = tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.start, 'amount.sum)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1970-01-01 00:00:00.0,40",
+ "Bob,1970-01-01 00:00:00.0,20",
+ "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProjectWithoutRowtimeProctime(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+ Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+ Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+ Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "val", "rtime"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+ val results = tEnv.scan("T")
+ .select('name, 'val, 'id)
+ .collect()
+
+ val expected = Seq(
+ "Mary,10,1",
+ "Bob,20,2",
+ "Mike,30,3",
+ "Liz,40,4").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProjectWithoutProctime(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+ Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+ Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+ Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "val", "rtime"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+ val results = tEnv.scan("T")
+ .select('rtime, 'name, 'id)
+ .collect()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.001,Mary,1",
+ "1970-01-01 00:00:00.002,Bob,2",
+ "1970-01-01 00:00:00.002,Mike,3",
+ "1970-01-01 00:00:02.001,Liz,4").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProjectWithoutRowtime(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+ Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+ Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+ Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "name", "val", "rtime"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+ val results = tEnv.scan("T")
+ .filter('ptime.cast(Types.LONG) > 0)
+ .select('name, 'id)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1",
+ "Bob,2",
+ "Mike,3",
+ "Liz,4").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ def testProjectOnlyProctime(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"),
+ Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"),
+ Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"),
+ Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz"))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "rtime", "val", "name"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+ val results = tEnv.scan("T")
+ .select('ptime > 0)
+ .select(1.count)
+ .collect()
+
+ val expected = Seq("4").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ def testProjectOnlyRowtime(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"),
+ Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"),
+ Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"),
+ Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz"))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "rtime", "val", "name"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+ val results = tEnv.scan("T")
+ .select('rtime)
+ .collect()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.001",
+ "1970-01-01 00:00:00.002",
+ "1970-01-01 00:00:00.002",
+ "1970-01-01 00:00:02.001").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProjectWithMapping(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JLong(1), new JInt(1), "Mary", new JLong(10)),
+ Row.of(new JLong(2), new JInt(2), "Bob", new JLong(20)),
+ Row.of(new JLong(2), new JInt(3), "Mike", new JLong(30)),
+ Row.of(new JLong(2001), new JInt(4), "Liz", new JLong(40)))
+
+ val tableSchema = new TableSchema(
+ Array("id", "rtime", "val", "ptime", "name"),
+ Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+ val returnType = new RowTypeInfo(
+ Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("p-rtime", "p-id", "p-name", "p-val"))
+ val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
+
+ tEnv.registerTableSource(
+ "T",
+ new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime", mapping))
+
+ val results = tEnv.scan("T")
+ .select('name, 'rtime, 'val)
+ .collect()
+
+ val expected = Seq(
+ "Mary,1970-01-01 00:00:00.001,10",
+ "Bob,1970-01-01 00:00:00.002,20",
+ "Mike,1970-01-01 00:00:00.002,30",
+ "Liz,1970-01-01 00:00:02.001,40").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testNestedProject(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ Row.of(new JLong(1),
+ Row.of(
+ Row.of("Sarah", new JInt(100)),
+ Row.of(new JInt(1000), new JBool(true))
+ ),
+ Row.of("Peter", new JInt(10000)),
+ "Mary"),
+ Row.of(new JLong(2),
+ Row.of(
+ Row.of("Rob", new JInt(200)),
+ Row.of(new JInt(2000), new JBool(false))
+ ),
+ Row.of("Lucy", new JInt(20000)),
+ "Bob"),
+ Row.of(new JLong(3),
+ Row.of(
+ Row.of("Mike", new JInt(300)),
+ Row.of(new JInt(3000), new JBool(true))
+ ),
+ Row.of("Betty", new JInt(30000)),
+ "Liz"))
+
+ val nested1 = new RowTypeInfo(
+ Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ Array("name", "value")
+ )
+ val nested2 = new RowTypeInfo(
+ Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]],
+ Array("num", "flag")
+ )
+ val deepNested = new RowTypeInfo(
+ Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]],
+ Array("nested1", "nested2")
+ )
+ val tableSchema = new TableSchema(
+ Array("id", "deepNested", "nested", "name"),
+ Array(Types.LONG, deepNested, nested1, Types.STRING))
+
+ val returnType = new RowTypeInfo(
+ Array(Types.LONG, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "deepNested", "nested", "name"))
+
+ tEnv.registerTableSource(
+ "T",
+ new TestNestedProjectableTableSource(tableSchema, returnType, data))
+
+ val results = tEnv
+ .scan("T")
+ .select('id,
+ 'deepNested.get("nested1").get("name") as 'nestedName,
+ 'nested.get("value") as 'nestedValue,
+ 'deepNested.get("nested2").get("flag") as 'nestedFlag,
+ 'deepNested.get("nested2").get("num") as 'nestedNum)
+ .collect()
+
+ val expected = Seq(
+ "1,Sarah,10000,true,1000",
+ "2,Rob,20000,false,2000",
+ "3,Mike,30000,true,3000").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index e672335..b7f97f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
import org.apache.flink.table.api.scala._
import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
-import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
import org.apache.flink.table.runtime.utils.StreamITCase
@@ -494,6 +494,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
@Test
def testTableSourceWithTimeIndicators(): Unit = {
+ StreamITCase.clear
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -505,15 +506,17 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
Row.of(new JInt(4), "D", new JLong(4000L)),
Row.of(new JInt(5), "E", new JLong(5000L)),
Row.of(new JInt(6), "F", new JLong(6000L)))
+
+ val fieldNames = Array("a", "b", "rowtime")
+ val schema = new TableSchema(
+ fieldNames :+ "proctime",
+ Array(Types.INT, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
val rowType = new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
- Array("a", "b", "rowtime")
- )
+ fieldNames)
- tEnv.registerTableSource(
- "testTable",
- new TestTableSourceWithTime(rows, rowType, "rowtime", "proctime"))
- StreamITCase.clear
+ val tableSource = new TestTableSourceWithTime(schema, rowType, rows, "rowtime", "proctime")
+ tEnv.registerTableSource("testTable", tableSource)
val result = tEnv
.scan("testTable")