You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:42:01 UTC
[4/5] flink git commit: [FLINK-8012] [table] Fix TableSink config for
tables with time attributes.
[FLINK-8012] [table] Fix TableSink config for tables with time attributes.
This closes #4974.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c5b5476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c5b5476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c5b5476
Branch: refs/heads/master
Commit: 8c5b5476d1404b31cd085730081d28c7695d3ec1
Parents: 1ad1830
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 17:12:18 2017 +0100
----------------------------------------------------------------------
.../scala/org/apache/flink/table/api/table.scala | 7 ++++++-
.../table/runtime/stream/table/TableSinkITCase.scala | 15 ++++++++++++---
2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c5b5476/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0430e49..7349a0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -848,7 +848,12 @@ class Table(
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .map {
+ // replace time indicator types by SQL_TIMESTAMP
+ case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+ case t: TypeInformation[_] => t
+ }.toArray
// configure the table sink
val configuredSink = sink.configure(fieldNames, fieldTypes)
http://git-wip-us.apache.org/repos/asf/flink/blob/8c5b5476/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 07934b8..b44d8ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -88,13 +88,16 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(4)
val input = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4
- val results = input.toTable(tEnv, 'a, 'b, 'c)
+ val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))
@@ -102,8 +105,14 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
env.execute()
val expected = Seq(
- "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
- "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+ "Hi,1970-01-01 00:00:00.001",
+ "Hello,1970-01-01 00:00:00.002",
+ "Hello world,1970-01-01 00:00:00.002",
+ "Hello world, how are you?,1970-01-01 00:00:00.003",
+ "Comment#12,1970-01-01 00:00:00.006",
+ "Comment#13,1970-01-01 00:00:00.006",
+ "Comment#14,1970-01-01 00:00:00.006",
+ "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}