You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 21:38:36 UTC
[3/3] flink git commit: [FLINK-8012] [table] Fix TableSink config for
tables with time attributes.
[FLINK-8012] [table] Fix TableSink config for tables with time attributes.
This closes #4974.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7893aba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7893aba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7893aba4
Branch: refs/heads/release-1.3
Commit: 7893aba41ca2a3f7a26bf4ffa3b1966ee95caa09
Parents: eb29957
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 21:48:03 2017 +0100
----------------------------------------------------------------------
.../main/scala/org/apache/flink/table/api/table.scala | 7 ++++++-
.../flink/table/sinks/StreamTableSinksITCase.scala | 14 ++++++++------
2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7893aba4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index abdfa65..5aa6d86 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -791,7 +791,12 @@ class Table(
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .map {
+ // replace time indicator types by SQL_TIMESTAMP
+ case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+ case t: TypeInformation[_] => t
+ }.toArray
// configure the table sink
val configuredSink = sink.configure(fieldNames, fieldTypes)
http://git-wip-us.apache.org/repos/asf/flink/blob/7893aba4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
index 723b603..a005e64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
@@ -45,6 +45,8 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
def testAppendSinkOnUpdatingTable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
val tEnv = TableEnvironment.getTableEnvironment(env)
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
@@ -70,18 +72,18 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
- .select('w.end, 'id.count, 'num.sum)
+ .select('w.rowtime, 'id.count, 'num.sum)
.writeToSink(new TestAppendSink)
env.execute()
val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
val expected = List(
- "1970-01-01 00:00:00.005,4,8",
- "1970-01-01 00:00:00.01,5,18",
- "1970-01-01 00:00:00.015,5,24",
- "1970-01-01 00:00:00.02,5,29",
- "1970-01-01 00:00:00.025,2,12")
+ "1970-01-01 00:00:00.004,4,8",
+ "1970-01-01 00:00:00.009,5,18",
+ "1970-01-01 00:00:00.014,5,24",
+ "1970-01-01 00:00:00.019,5,29",
+ "1970-01-01 00:00:00.024,2,12")
.sorted
assertEquals(expected, result)
}