You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:05:01 UTC
[50/50] [abbrv] flink git commit: [FLINK-5653] [table] Add
processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.
[FLINK-5653] [table] Add processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.
This closes #3653.
This closes #3574.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee033c90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee033c90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee033c90
Branch: refs/heads/table-retraction
Commit: ee033c903b20d7a233009764b6b96e78eea5b981
Parents: 44f9c76
Author: Stefano Bortoli <s....@gmail.com>
Authored: Thu Mar 30 11:28:41 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 30 22:12:21 2017 +0200
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 6 +-
.../table/runtime/aggregate/AggregateUtil.scala | 9 +-
...oundedProcessingOverRowProcessFunction.scala | 181 ++++++++++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 184 ++++++++++++++++++-
.../scala/stream/sql/WindowAggregateTest.scala | 55 ++++++
5 files changed, 423 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index e24dd23..2df4e02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -112,8 +112,10 @@ class DataStreamOverAggregate(
// bounded OVER window
if (overWindow.isRows) {
// ROWS clause bounded OVER window
- throw new TableException(
- "processing-time OVER ROWS PRECEDING window is not supported yet.")
+ createBoundedAndCurrentRowOverWindow(
+ inputDS,
+ isRangeClause = true,
+ isRowTimeType = false)
} else {
// RANGE clause bounded OVER window
throw new TableException(
http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 93ab7b7..88e9d68 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -138,8 +138,13 @@ object AggregateUtil {
)
}
} else {
- throw TableException(
- "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+ new BoundedProcessingOverRowProcessFunction(
+ aggregates,
+ aggFields,
+ precedingOffset,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
new file mode 100644
index 0000000..454b177
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val precedingOffset: Long,
+ private val forwardedFieldCount: Int,
+ private val aggregatesTypeInfo: RowTypeInfo,
+ private val inputType: TypeInformation[Row])
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkArgument(precedingOffset > 0)
+
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ private var output: Row = _
+ private var counterState: ValueState[Long] = _
+ private var smallestTsState: ValueState[Long] = _
+
+ override def open(config: Configuration) {
+
+ output = new Row(forwardedFieldCount + aggregates.length)
+ // We keep the elements received in a Map state keyed
+ // by the ingestion time in the operator.
+ // we also keep counter of processed elements
+ // and timestamp of oldest element
+ val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+ rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+ accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
+
+ val processedCountDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+ counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+ val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+ smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ val currentTime = ctx.timerService.currentProcessingTime
+ var i = 0
+
+ // initialize state for the processed element
+ var accumulators = accumulatorState.value
+ if (accumulators == null) {
+ accumulators = new Row(aggregates.length)
+ while (i < aggregates.length) {
+ accumulators.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ // get smallest timestamp
+ var smallestTs = smallestTsState.value
+ if (smallestTs == 0L) {
+ smallestTs = currentTime
+ smallestTsState.update(smallestTs)
+ }
+ // get previous counter value
+ var counter = counterState.value
+
+ if (counter == precedingOffset) {
+ val retractList = rowMapState.get(smallestTs)
+
+ // get oldest element beyond buffer size
+ // and if oldest element exist, retract value
+ i = 0
+ while (i < aggregates.length) {
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i)))
+ i += 1
+ }
+ retractList.remove(0)
+ // if reference timestamp list not empty, keep the list
+ if (!retractList.isEmpty) {
+ rowMapState.put(smallestTs, retractList)
+ } // if smallest timestamp list is empty, remove and find new smallest
+ else {
+ rowMapState.remove(smallestTs)
+ val iter = rowMapState.keys.iterator
+ var currentTs: Long = 0L
+ var newSmallestTs: Long = Long.MaxValue
+ while (iter.hasNext) {
+ currentTs = iter.next
+ if (currentTs < newSmallestTs) {
+ newSmallestTs = currentTs
+ }
+ }
+ smallestTsState.update(newSmallestTs)
+ }
+ } // we update the counter only while buffer is getting filled
+ else {
+ counter += 1
+ counterState.update(counter)
+ }
+
+ // copy forwarded fields in output row
+ i = 0
+ while (i < forwardedFieldCount) {
+ output.setField(i, input.getField(i))
+ i += 1
+ }
+
+ // accumulate current row and set aggregate in output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+
+ // update map state, accumulator state, counter and timestamp
+ val currentTimeState = rowMapState.get(currentTime)
+ if (currentTimeState != null) {
+ currentTimeState.add(input)
+ rowMapState.put(currentTime, currentTimeState)
+ } else { // add new input
+ val newList = new util.ArrayList[Row]
+ newList.add(input)
+ rowMapState.put(currentTime, newList)
+ }
+
+ accumulatorState.update(accumulators)
+
+ out.collect(output)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 0d3a46c..67d13b0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -344,7 +344,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val expected = mutable.MutableList(
"Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5","Hello,2,3,6",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
"Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
"Hello,6,3,15",
"Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
@@ -471,12 +471,12 @@ class SqlITCase extends StreamingWithStateTestBase {
val expected = mutable.MutableList(
"Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
"Hello,3,4,9",
"Hello,4,2,7",
"Hello,5,2,9",
- "Hello,6,2,11","Hello,65,2,12",
- "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
"Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
"Hello World,8,2,15",
"Hello World,20,1,20")
@@ -543,12 +543,12 @@ class SqlITCase extends StreamingWithStateTestBase {
val expected = mutable.MutableList(
"Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
"Hello,3,4,9",
"Hello,4,2,7",
"Hello,5,2,9",
- "Hello,6,2,11","Hello,65,2,12",
- "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
"Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
"Hello World,8,2,15",
"Hello World,20,1,20")
@@ -556,7 +556,7 @@ class SqlITCase extends StreamingWithStateTestBase {
}
/**
- * All aggregates must be computed on the same window.
+ * All aggregates must be computed on the same window.
*/
@Test(expected = classOf[TableException])
def testMultiWindow(): Unit = {
@@ -972,6 +972,174 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testPartitionedProcTimeOverWindow(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+ " FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,24,7",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,36,11",
+ "5,39,12")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testPartitionedProcTimeOverWindow2(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+ " FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+
+ @Test
+ def testNonPartitionedProcTimeOverWindow(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " MIN(c) OVER (" +
+ " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+ " FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,0,0",
+ "2,1,0",
+ "2,3,0",
+ "3,6,1",
+ "3,9,2",
+ "3,12,3",
+ "4,15,4",
+ "4,18,5",
+ "4,21,6",
+ "4,24,7",
+ "5,27,8",
+ "5,30,9",
+ "5,33,10",
+ "5,36,11",
+ "5,39,12")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testNonPartitionedProcTimeOverWindow2(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " MIN(c) OVER (" +
+ " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+ " FROM MyTable"
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,0,0",
+ "2,1,0",
+ "2,3,0",
+ "3,6,0",
+ "3,10,0",
+ "3,15,0",
+ "4,21,0",
+ "4,28,0",
+ "4,36,0",
+ "4,45,0",
+ "5,55,0",
+ "5,66,1",
+ "5,77,2",
+ "5,88,3",
+ "5,99,4")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
object SqlITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 45d204a..52fd5f8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -405,4 +405,59 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+ @Test
+ def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("orderBy", "PROCTIME"),
+ term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testBoundPartitionedProcTimeWindowWithRowRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "PROCTIME"),
+ term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
}