You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:03:06 UTC

[1/2] flink git commit: [FLINK-5653] [table] Add processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.

Repository: flink
Updated Branches:
  refs/heads/master aa3c395b9 -> ee033c903


[FLINK-5653] [table] Add processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.

This closes #3653.
This closes #3574.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee033c90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee033c90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee033c90

Branch: refs/heads/master
Commit: ee033c903b20d7a233009764b6b96e78eea5b981
Parents: 44f9c76
Author: Stefano Bortoli <s....@gmail.com>
Authored: Thu Mar 30 11:28:41 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 30 22:12:21 2017 +0200

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    |   6 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   9 +-
 ...oundedProcessingOverRowProcessFunction.scala | 181 ++++++++++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 184 ++++++++++++++++++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  55 ++++++
 5 files changed, 423 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index e24dd23..2df4e02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -112,8 +112,10 @@ class DataStreamOverAggregate(
           // bounded OVER window
           if (overWindow.isRows) {
             // ROWS clause bounded OVER window
-            throw new TableException(
-              "processing-time OVER ROWS PRECEDING window is not supported yet.")
+            createBoundedAndCurrentRowOverWindow(
+              inputDS,
+              isRangeClause = true,
+              isRowTimeType = false)
           } else {
             // RANGE clause bounded OVER window
             throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 93ab7b7..88e9d68 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -138,8 +138,13 @@ object AggregateUtil {
         )
       }
     } else {
-      throw TableException(
-        "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+      new BoundedProcessingOverRowProcessFunction(
+        aggregates,
+        aggFields,
+        precedingOffset,
+        inputType.getFieldCount,
+        aggregationStateType,
+        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
new file mode 100644
index 0000000..454b177
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+    extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+
+  override def open(config: Configuration) {
+
+    output = new Row(forwardedFieldCount + aggregates.length)
+    // We keep the elements received in a Map state keyed
+    // by the ingestion time in the operator.
+    // we also keep counter of processed elements
+    // and timestamp of oldest element
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
+
+    val processedCountDescriptor : ValueStateDescriptor[Long] =
+       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+    counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    val currentTime = ctx.timerService.currentProcessingTime
+    var i = 0
+
+    // initialize state for the processed element
+    var accumulators = accumulatorState.value
+    if (accumulators == null) {
+      accumulators = new Row(aggregates.length)
+      while (i < aggregates.length) {
+        accumulators.setField(i, aggregates(i).createAccumulator())
+        i += 1
+      }
+    }
+
+    // get smallest timestamp
+    var smallestTs = smallestTsState.value
+    if (smallestTs == 0L) {
+      smallestTs = currentTime
+      smallestTsState.update(smallestTs)
+    }
+    // get previous counter value
+    var counter = counterState.value
+
+    if (counter == precedingOffset) {
+      val retractList = rowMapState.get(smallestTs)
+
+      // get oldest element beyond buffer size
+      // and if oldest element exist, retract value
+      i = 0
+      while (i < aggregates.length) {
+        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+        aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i)))
+        i += 1
+      }
+      retractList.remove(0)
+      // if reference timestamp list not empty, keep the list
+      if (!retractList.isEmpty) {
+        rowMapState.put(smallestTs, retractList)
+      } // if smallest timestamp list is empty, remove and find new smallest
+      else {
+        rowMapState.remove(smallestTs)
+        val iter = rowMapState.keys.iterator
+        var currentTs: Long = 0L
+        var newSmallestTs: Long = Long.MaxValue
+        while (iter.hasNext) {
+          currentTs = iter.next
+          if (currentTs < newSmallestTs) {
+            newSmallestTs = currentTs
+          }
+        }
+        smallestTsState.update(newSmallestTs)
+      }
+    } // we update the counter only while buffer is getting filled
+    else {
+      counter += 1
+      counterState.update(counter)
+    }
+
+    // copy forwarded fields in output row
+    i = 0
+    while (i < forwardedFieldCount) {
+      output.setField(i, input.getField(i))
+      i += 1
+    }
+
+    // accumulate current row and set aggregate in output row
+    i = 0
+    while (i < aggregates.length) {
+      val index = forwardedFieldCount + i
+      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+      output.setField(index, aggregates(i).getValue(accumulator))
+      i += 1
+    }
+
+    // update map state, accumulator state, counter and timestamp
+    val currentTimeState = rowMapState.get(currentTime)
+    if (currentTimeState != null) {
+      currentTimeState.add(input)
+      rowMapState.put(currentTime, currentTimeState)
+    } else { // add new input
+      val newList = new util.ArrayList[Row]
+      newList.add(input)
+      rowMapState.put(currentTime, newList)
+    }
+
+    accumulatorState.update(accumulators)
+
+    out.collect(output)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 0d3a46c..67d13b0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -344,7 +344,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val expected = mutable.MutableList(
       "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5","Hello,2,3,6",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
       "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
       "Hello,6,3,15",
       "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
@@ -471,12 +471,12 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val expected = mutable.MutableList(
       "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
       "Hello,3,4,9",
       "Hello,4,2,7",
       "Hello,5,2,9",
-      "Hello,6,2,11","Hello,65,2,12",
-      "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
       "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
       "Hello World,8,2,15",
       "Hello World,20,1,20")
@@ -543,12 +543,12 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val expected = mutable.MutableList(
       "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
       "Hello,3,4,9",
       "Hello,4,2,7",
       "Hello,5,2,9",
-      "Hello,6,2,11","Hello,65,2,12",
-      "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
       "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
       "Hello World,8,2,15",
       "Hello World,20,1,20")
@@ -556,7 +556,7 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   /**
-    *  All aggregates must be computed on the same window.
+    * All aggregates must be computed on the same window.
     */
   @Test(expected = classOf[TableException])
   def testMultiWindow(): Unit = {
@@ -972,6 +972,174 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testPartitionedProcTimeOverWindow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a,  " +
+      " SUM(c) OVER (" +
+      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " MIN(c) OVER (" +
+      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+      " FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,24,7",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,36,11",
+      "5,39,12")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testPartitionedProcTimeOverWindow2(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a,  " +
+      " SUM(c) OVER (" +
+      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " MIN(c) OVER (" +
+      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+      " FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+
+  @Test
+  def testNonPartitionedProcTimeOverWindow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a,  " +
+      " SUM(c) OVER (" +
+      " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " MIN(c) OVER (" +
+      " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+      " FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0",
+      "2,1,0",
+      "2,3,0",
+      "3,6,1",
+      "3,9,2",
+      "3,12,3",
+      "4,15,4",
+      "4,18,5",
+      "4,21,6",
+      "4,24,7",
+      "5,27,8",
+      "5,30,9",
+      "5,33,10",
+      "5,36,11",
+      "5,39,12")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonPartitionedProcTimeOverWindow2(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a,  " +
+      " SUM(c) OVER (" +
+      " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " MIN(c) OVER (" +
+      " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+      " FROM MyTable"
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0",
+      "2,1,0",
+      "2,3,0",
+      "3,6,0",
+      "3,10,0",
+      "3,15,0",
+      "4,21,0",
+      "4,28,0",
+      "4,36,0",
+      "4,45,0",
+      "5,55,0",
+      "5,66,1",
+      "5,77,2",
+      "5,88,3",
+      "5,99,4")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 
 object SqlITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 45d204a..52fd5f8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -405,4 +405,59 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
+ @Test
+  def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+  
+  @Test
+  def testBoundPartitionedProcTimeWindowWithRowRange() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
 }


[2/2] flink git commit: [FLINK-6200] [table] Add support for unbounded event-time OVER RANGE window.

Posted by fh...@apache.org.
[FLINK-6200] [table] Add support for unbounded event-time OVER RANGE window.

This closes #3649.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f9c76a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f9c76a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f9c76a

Branch: refs/heads/master
Commit: 44f9c76a9ff50e95947c9f78a86b485f564e3796
Parents: aa3c395
Author: hongyuhong 00223286 <ho...@huawei.com>
Authored: Wed Mar 29 10:29:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 30 22:12:21 2017 +0200

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    |  20 +--
 .../table/runtime/aggregate/AggregateUtil.scala |  26 ++-
 .../UnboundedEventTimeOverProcessFunction.scala | 163 +++++++++++++++----
 .../table/api/scala/stream/sql/SqlITCase.scala  | 132 +++++++++++++++
 4 files changed, 292 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 7b744f1..e24dd23 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -127,14 +127,8 @@ class DataStreamOverAggregate(
         // row-time OVER window
         if (overWindow.lowerBound.isPreceding &&
               overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
-          if (overWindow.isRows) {
-            // unbounded preceding OVER ROWS window
-            createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
-          } else {
-            // unbounded preceding OVER RANGE window
-            throw new TableException(
-              "row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.")
-          }
+          // ROWS/RANGE clause unbounded OVER window
+          createUnboundedAndCurrentRowEventTimeOverWindow(inputDS, overWindow.isRows)
         } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           if (overWindow.isRows) {
@@ -202,8 +196,8 @@ class DataStreamOverAggregate(
 
   def createBoundedAndCurrentRowOverWindow(
     inputDS: DataStream[Row],
-    isRangeClause: Boolean = false,
-    isRowTimeType: Boolean = false): DataStream[Row] = {
+    isRangeClause: Boolean,
+    isRowTimeType: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -247,7 +241,8 @@ class DataStreamOverAggregate(
   }
 
   def createUnboundedAndCurrentRowEventTimeOverWindow(
-    inputDS: DataStream[Row]): DataStream[Row]  = {
+    inputDS: DataStream[Row],
+    isRows: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -258,7 +253,8 @@ class DataStreamOverAggregate(
 
     val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
       namedAggregates,
-      inputType)
+      inputType,
+      isRows)
 
     val result: DataStream[Row] =
       // partitioned aggregation

http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index cbb2e53..93ab7b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -152,7 +152,8 @@ object AggregateUtil {
     */
   private[flink] def createUnboundedEventTimeOverProcessFunction(
    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-   inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+   inputType: RelDataType,
+   isRows: Boolean): UnboundedEventTimeOverProcessFunction = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
@@ -162,12 +163,23 @@ object AggregateUtil {
 
     val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
 
-    new UnboundedEventTimeOverProcessFunction(
-      aggregates,
-      aggFields,
-      inputType.getFieldCount,
-      aggregationStateType,
-      FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+    if (isRows) {
+      // ROWS unbounded over process function
+      new UnboundedEventTimeRowsOverProcessFunction(
+        aggregates,
+        aggFields,
+        inputType.getFieldCount,
+        aggregationStateType,
+        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+    } else {
+      // RANGE unbounded over process function
+      new UnboundedEventTimeRangeOverProcessFunction(
+        aggregates,
+        aggFields,
+        inputType.getFieldCount,
+        aggregationStateType,
+        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
index 7616ede..92faf7d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -41,7 +41,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
   * @param inputType the input row tye which the state saved
   *
   */
-class UnboundedEventTimeOverProcessFunction(
+abstract class UnboundedEventTimeOverProcessFunction(
     private val aggregates: Array[AggregateFunction[_]],
     private val aggFields: Array[Int],
     private val forwardedFieldCount: Int,
@@ -53,7 +53,7 @@ class UnboundedEventTimeOverProcessFunction(
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
-  private var output: Row = _
+  protected var output: Row = _
   // state to hold the accumulators of the aggregations
   private var accumulatorState: ValueState[Row] = _
   // state to hold rows until the next watermark arrives
@@ -162,30 +162,9 @@ class UnboundedEventTimeOverProcessFunction(
         val curRowList = rowMapState.get(curTimestamp)
         collector.setAbsoluteTimestamp(curTimestamp)
 
-        var j = 0
-        while (j < curRowList.size) {
-          val curRow = curRowList.get(j)
-          i = 0
-
-          // copy forwarded fields to output row
-          while (i < forwardedFieldCount) {
-            output.setField(i, curRow.getField(i))
-            i += 1
-          }
-
-          // update accumulators and copy aggregates to output row
-          i = 0
-          while (i < aggregates.length) {
-            val index = forwardedFieldCount + i
-            val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
-            aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
-            output.setField(index, aggregates(i).getValue(accumulator))
-            i += 1
-          }
-          // emit output row
-          collector.collect(output)
-          j += 1
-        }
+        // process the same timestamp datas, the mechanism is different according ROWS or RANGE
+        processElementsWithSameTimestamp(curRowList, lastAccumulator, collector)
+
         rowMapState.remove(curTimestamp)
       }
 
@@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(
    * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
    * an append with O(1).
    */
-  private def insertToSortedList(recordTimeStamp: Long) = {
+  private def insertToSortedList(recordTimestamp: Long) = {
     val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
     var continue = true
     while (listIterator.hasPrevious && continue) {
       val timestamp = listIterator.previous
-      if (recordTimeStamp >= timestamp) {
+      if (recordTimestamp >= timestamp) {
         listIterator.next
-        listIterator.add(recordTimeStamp)
+        listIterator.add(recordTimestamp)
         continue = false
       }
     }
 
     if (continue) {
-      sortedTimestamps.addFirst(recordTimeStamp)
+      sortedTimestamps.addFirst(recordTimestamp)
     }
   }
 
+  /**
+   * Process the same timestamp datas, the mechanism is different between
+   * rows and range window.
+   */
+  def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit
+
+}
+
+/**
+  * A ProcessFunction to support unbounded ROWS window.
+  * The ROWS clause defines on a physical level how many rows are included in a window frame.
+  */
+class UnboundedEventTimeRowsOverProcessFunction(
+   aggregates: Array[AggregateFunction[_]],
+   aggFields: Array[Int],
+   forwardedFieldCount: Int,
+   intermediateType: TypeInformation[Row],
+   inputType: TypeInformation[Row])
+  extends UnboundedEventTimeOverProcessFunction(
+    aggregates,
+    aggFields,
+    forwardedFieldCount,
+    intermediateType,
+    inputType) {
+
+  override def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit = {
+
+    var j = 0
+    var i = 0
+    while (j < curRowList.size) {
+      val curRow = curRowList.get(j)
+      i = 0
+
+      // copy forwarded fields to output row
+      while (i < forwardedFieldCount) {
+        output.setField(i, curRow.getField(i))
+        i += 1
+      }
+
+      // update accumulators and copy aggregates to output row
+      i = 0
+      while (i < aggregates.length) {
+        val index = forwardedFieldCount + i
+        val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+        aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+        output.setField(index, aggregates(i).getValue(accumulator))
+        i += 1
+      }
+      // emit output row
+      out.collect(output)
+      j += 1
+    }
+  }
+}
+
+
+/**
+  * A ProcessFunction to support unbounded RANGE window.
+  * The RANGE option includes all the rows within the window frame
+  * that have the same ORDER BY values as the current row.
+  */
+class UnboundedEventTimeRangeOverProcessFunction(
+    aggregates: Array[AggregateFunction[_]],
+    aggFields: Array[Int],
+    forwardedFieldCount: Int,
+    intermediateType: TypeInformation[Row],
+    inputType: TypeInformation[Row])
+  extends UnboundedEventTimeOverProcessFunction(
+    aggregates,
+    aggFields,
+    forwardedFieldCount,
+    intermediateType,
+    inputType) {
+
+  override def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit = {
+
+    var j = 0
+    var i = 0
+    // all same timestamp data should have same aggregation value.
+    while (j < curRowList.size) {
+      val curRow = curRowList.get(j)
+      i = 0
+      while (i < aggregates.length) {
+        val index = forwardedFieldCount + i
+        val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+        aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+        i += 1
+      }
+      j += 1
+    }
+
+    // emit output row
+    j = 0
+    while (j < curRowList.size) {
+      val curRow = curRowList.get(j)
+
+      // copy forwarded fields to output row
+      i = 0
+      while (i < forwardedFieldCount) {
+        output.setField(i, curRow.getField(i))
+        i += 1
+      }
+
+      //copy aggregates to output row
+      i = 0
+      while (i < aggregates.length) {
+        val index = forwardedFieldCount + i
+        val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+        output.setField(index, aggregates(i).getValue(accumulator))
+        i += 1
+      }
+      out.collect(output)
+      j += 1
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index b8285a1..0d3a46c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -840,6 +840,138 @@ class SqlITCase extends StreamingWithStateTestBase {
       "6,8,Hello world,51,9,5,9,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time non-partitioned unbounded RANGE window **/
+  @Test
+  def testUnboundedNonPartitionedEventTimeRangeWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,1,Hello,1,1,1,1,1",
+      "1,1,Hello,7,4,1,3,1",
+      "1,2,Hello,7,4,1,3,1",
+      "1,3,Hello world,7,4,1,3,1",
+      "2,2,Hello world,12,6,2,3,1",
+      "2,3,Hello world,12,6,2,3,1",
+      "1,1,Hi,13,7,1,3,1",
+      "1,4,Hello world,17,8,2,4,1",
+      "1,5,Hello world,35,11,3,7,1",
+      "1,6,Hello world,35,11,3,7,1",
+      "1,7,Hello world,35,11,3,7,1",
+      "2,4,Hello world,44,13,3,7,1",
+      "2,5,Hello world,44,13,3,7,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test sliding event-time unbounded RANGE window **/
+  @Test
+  def testUnboundedPartitionedEventTimeRangeWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hello,6,3,2,3,1",
+      "1,2,Hello,6,3,2,3,1",
+      "1,3,Hello world,6,3,2,3,1",
+      "1,1,Hi,7,4,1,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,6,3,2,3,1",
+      "2,3,Hello world,6,3,2,3,1",
+      "1,4,Hello world,11,5,2,4,1",
+      "1,5,Hello world,29,8,3,7,1",
+      "1,6,Hello world,29,8,3,7,1",
+      "1,7,Hello world,29,8,3,7,1",
+      "2,4,Hello world,15,5,3,5,1",
+      "2,5,Hello world,15,5,3,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 
 object SqlITCase {