You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/04/25 14:30:46 UTC

flink git commit: [FLINK-6368] [table] Grouping keys in stream aggregations have wrong order

Repository: flink
Updated Branches:
  refs/heads/master bc6409d62 -> 05088b4a6


[FLINK-6368] [table] Grouping keys in stream aggregations have wrong order

This closes #3768.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05088b4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05088b4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05088b4a

Branch: refs/heads/master
Commit: 05088b4a61001b536b3d07e49c415606edf11fba
Parents: bc6409d
Author: xccui <xi...@gmail.com>
Authored: Tue Apr 25 14:06:45 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Apr 25 16:24:27 2017 +0200

----------------------------------------------------------------------
 .../nodes/datastream/DataStreamAggregate.scala  |  7 +++--
 .../datastream/DataStreamAggregateITCase.scala  | 27 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05088b4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 62bcfc8..187773d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -101,7 +101,6 @@ class DataStreamAggregate(
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
-    val groupingKeys = grouping.indices.toArray
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
@@ -124,15 +123,15 @@ class DataStreamAggregate(
       inputDS.getType)
 
     // grouped / keyed aggregation
-    if (groupingKeys.length > 0) {
+    if (grouping.length > 0) {
       val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
         window,
-        groupingKeys.length,
+        grouping.length,
         namedAggregates.size,
         rowRelDataType.getFieldCount,
         namedProperties)
 
-      val keyedStream = inputDS.keyBy(groupingKeys: _*)
+      val keyedStream = inputDS.keyBy(grouping: _*)
       val windowedStream =
         createKeyedWindowedStream(window, keyedStream)
           .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]

http://git-wip-us.apache.org/repos/asf/flink/blob/05088b4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 85a2373..dcd3c6c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -213,6 +213,33 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+      .map(t => (t._2, t._6))
+    val table = stream.toTable(tEnv, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object DataStreamAggregateITCase {