You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:42:01 UTC

[4/5] flink git commit: [FLINK-8012] [table] Fix TableSink config for tables with time attributes.

[FLINK-8012] [table] Fix TableSink config for tables with time attributes.

This closes #4974.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c5b5476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c5b5476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c5b5476

Branch: refs/heads/master
Commit: 8c5b5476d1404b31cd085730081d28c7695d3ec1
Parents: 1ad1830
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 17:12:18 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/flink/table/api/table.scala     |  7 ++++++-
 .../table/runtime/stream/table/TableSinkITCase.scala | 15 ++++++++++++---
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c5b5476/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0430e49..7349a0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -848,7 +848,12 @@ class Table(
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+      .map {
+        // replace time indicator types by SQL_TIMESTAMP
+        case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+        case t: TypeInformation[_] => t
+      }.toArray
 
     // configure the table sink
     val configuredSink = sink.configure(fieldNames, fieldTypes)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c5b5476/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 07934b8..b44d8ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -88,13 +88,16 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(4)
 
     val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._2)
       .map(x => x).setParallelism(4) // increase DOP to 4
 
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
+    val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
       .select('c, 'b)
       .writeToSink(new CsvTableSink(path))
@@ -102,8 +105,14 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
     env.execute()
 
     val expected = Seq(
-      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
-      "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+      "Hi,1970-01-01 00:00:00.001",
+      "Hello,1970-01-01 00:00:00.002",
+      "Hello world,1970-01-01 00:00:00.002",
+      "Hello world, how are you?,1970-01-01 00:00:00.003",
+      "Comment#12,1970-01-01 00:00:00.006",
+      "Comment#13,1970-01-01 00:00:00.006",
+      "Comment#14,1970-01-01 00:00:00.006",
+      "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }