You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/04/25 14:30:46 UTC
flink git commit: [FLINK-6368] [table] Grouping keys in stream
aggregations have wrong order
Repository: flink
Updated Branches:
refs/heads/master bc6409d62 -> 05088b4a6
[FLINK-6368] [table] Grouping keys in stream aggregations have wrong order
This closes #3768.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05088b4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05088b4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05088b4a
Branch: refs/heads/master
Commit: 05088b4a61001b536b3d07e49c415606edf11fba
Parents: bc6409d
Author: xccui <xi...@gmail.com>
Authored: Tue Apr 25 14:06:45 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Apr 25 16:24:27 2017 +0200
----------------------------------------------------------------------
.../nodes/datastream/DataStreamAggregate.scala | 7 +++--
.../datastream/DataStreamAggregateITCase.scala | 27 ++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/05088b4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 62bcfc8..187773d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -101,7 +101,6 @@ class DataStreamAggregate(
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
- val groupingKeys = grouping.indices.toArray
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
@@ -124,15 +123,15 @@ class DataStreamAggregate(
inputDS.getType)
// grouped / keyed aggregation
- if (groupingKeys.length > 0) {
+ if (grouping.length > 0) {
val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
window,
- groupingKeys.length,
+ grouping.length,
namedAggregates.size,
rowRelDataType.getFieldCount,
namedProperties)
- val keyedStream = inputDS.keyBy(groupingKeys: _*)
+ val keyedStream = inputDS.keyBy(grouping: _*)
val windowedStream =
createKeyedWindowedStream(window, keyedStream)
.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
http://git-wip-us.apache.org/repos/asf/flink/blob/05088b4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 85a2373..dcd3c6c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -213,6 +213,33 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
"Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ .map(t => (t._2, t._6))
+ val table = stream.toTable(tEnv, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
object DataStreamAggregateITCase {