You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 21:38:36 UTC

[3/3] flink git commit: [FLINK-8012] [table] Fix TableSink config for tables with time attributes.

[FLINK-8012] [table] Fix TableSink config for tables with time attributes.

This closes #4974.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7893aba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7893aba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7893aba4

Branch: refs/heads/release-1.3
Commit: 7893aba41ca2a3f7a26bf4ffa3b1966ee95caa09
Parents: eb29957
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 21:48:03 2017 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/flink/table/api/table.scala |  7 ++++++-
 .../flink/table/sinks/StreamTableSinksITCase.scala    | 14 ++++++++------
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7893aba4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index abdfa65..5aa6d86 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -791,7 +791,12 @@ class Table(
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+      .map {
+        // replace time indicator types by SQL_TIMESTAMP
+        case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+        case t: TypeInformation[_] => t
+      }.toArray
 
     // configure the table sink
     val configuredSink = sink.configure(fieldNames, fieldTypes)

http://git-wip-us.apache.org/repos/asf/flink/blob/7893aba4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
index 723b603..a005e64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
@@ -45,6 +45,8 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
   def testAppendSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
@@ -70,18 +72,18 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
 
     t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w)
-      .select('w.end, 'id.count, 'num.sum)
+      .select('w.rowtime, 'id.count, 'num.sum)
       .writeToSink(new TestAppendSink)
 
     env.execute()
 
     val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
     val expected = List(
-      "1970-01-01 00:00:00.005,4,8",
-      "1970-01-01 00:00:00.01,5,18",
-      "1970-01-01 00:00:00.015,5,24",
-      "1970-01-01 00:00:00.02,5,29",
-      "1970-01-01 00:00:00.025,2,12")
+      "1970-01-01 00:00:00.004,4,8",
+      "1970-01-01 00:00:00.009,5,18",
+      "1970-01-01 00:00:00.014,5,24",
+      "1970-01-01 00:00:00.019,5,29",
+      "1970-01-01 00:00:00.024,2,12")
       .sorted
     assertEquals(expected, result)
   }