You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:04 UTC
[3/5] flink git commit: [FLINK-6491] [table] Add QueryConfig and
state clean up for over-windowed aggregates.
[FLINK-6491] [table] Add QueryConfig and state clean up for over-windowed aggregates.
This closes #3863.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24808871
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24808871
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24808871
Branch: refs/heads/master
Commit: 2480887180d881c30d228a73c746f94abbcbbb64
Parents: d16339d
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue May 9 14:36:42 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 11 23:53:25 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 8 +-
.../apache/flink/table/api/QueryConfig.scala | 102 -----
.../table/api/StreamTableEnvironment.scala | 38 +-
.../table/api/java/StreamTableEnvironment.scala | 35 +-
.../apache/flink/table/api/queryConfig.scala | 102 +++++
.../api/scala/StreamTableEnvironment.scala | 20 +-
.../table/api/scala/TableConversions.scala | 13 +-
.../org/apache/flink/table/api/table.scala | 6 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 5 +-
.../nodes/datastream/DataStreamCorrelate.scala | 4 +-
.../datastream/DataStreamGroupAggregate.scala | 10 +-
.../DataStreamGroupWindowAggregate.scala | 4 +-
.../datastream/DataStreamOverAggregate.scala | 34 +-
.../plan/nodes/datastream/DataStreamRel.scala | 4 +-
.../plan/nodes/datastream/DataStreamScan.scala | 2 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 6 +-
.../nodes/datastream/DataStreamValues.scala | 2 +-
.../datastream/StreamTableSourceScan.scala | 2 +-
.../table/runtime/aggregate/AggregateUtil.scala | 32 +-
.../aggregate/GroupAggProcessFunction.scala | 44 +-
.../aggregate/ProcTimeBoundedRangeOver.scala | 26 +-
.../aggregate/ProcTimeBoundedRowsOver.scala | 20 +-
.../ProcTimeUnboundedNonPartitionedOver.scala | 20 +-
.../ProcTimeUnboundedPartitionedOver.scala | 20 +-
.../ProcessFunctionWithCleanupState.scala | 85 ++++
.../aggregate/RowTimeBoundedRangeOver.scala | 44 +-
.../aggregate/RowTimeBoundedRowsOver.scala | 41 +-
.../aggregate/RowTimeUnboundedOver.scala | 44 +-
.../stream/table/GroupAggregationsITCase.scala | 13 +-
...ProcessingOverRangeProcessFunctionTest.scala | 336 --------------
.../table/runtime/harness/HarnessTestBase.scala | 281 +++++++++++-
.../runtime/harness/NonWindowHarnessTest.scala | 157 +++++++
.../runtime/harness/OverWindowHarnessTest.scala | 458 ++++++++++---------
.../table/utils/MockTableEnvironment.scala | 2 +-
34 files changed, 1209 insertions(+), 811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index f33c187..3c0f51b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -113,17 +113,17 @@ abstract class BatchTableEnvironment(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
*/
override private[flink] def writeToSink[T](
table: Table,
sink: TableSink[T],
- qConfig: QueryConfig): Unit = {
+ queryConfig: QueryConfig): Unit = {
// We do not pass the configuration on, because there is nothing to configure for batch queries.
- val bQConfig = qConfig match {
- case batchConfig: BatchQueryConfig => batchConfig
+ queryConfig match {
+ case _: BatchQueryConfig =>
case _ =>
throw new TableException("BatchQueryConfig required to configure batch query.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
deleted file mode 100644
index 8e8b5ac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api
-
-import _root_.java.io.Serializable
-import org.apache.flink.api.common.time.Time
-
-class QueryConfig private[table] extends Serializable {}
-
-/**
- * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
- */
-class BatchQueryConfig private[table] extends QueryConfig
-
-/**
- * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
- *
- * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]]
- * method.
- */
-class StreamQueryConfig private[table] extends QueryConfig {
-
- /**
- * The minimum time until state which was not updated will be retained.
- * State might be cleared and removed if it was not updated for the defined period of time.
- */
- private var minIdleStateRetentionTime: Long = Long.MinValue
-
- /**
- * The maximum time until state which was not updated will be retained.
- * State will be cleared and removed if it was not updated for the defined period of time.
- */
- private var maxIdleStateRetentionTime: Long = Long.MinValue
-
- /**
- * Specifies the time interval for how long idle state, i.e., state which was not updated, will
- * be retained. When state was not updated for the specified interval of time, it will be cleared
- * and removed.
- *
- * When new data arrives for previously cleaned-up state, the new data will be handled as if it
- * was the first data. This can result in previous results being overwritten.
- *
- * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
- * maximum time for state to be retained. This method is more efficient, because the system has
- * to do less bookkeeping to identify the time at which state must be cleared.
- *
- * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
- * clean-up the state.
- */
- def setIdleStateRetentionTime(time: Time): StreamQueryConfig = {
- setIdleStateRetentionTime(time, time)
- }
-
- /**
- * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
- * was not updated, will be retained.
- * State will never be cleared until it was idle for less than the minimum time and will never
- * be kept if it was idle for more than the maximum time.
- *
- * When new data arrives for previously cleaned-up state, the new data will be handled as if it
- * was the first data. This can result in previous results being overwritten.
- *
- * Set to 0 (zero) to never clean-up the state.
- *
- * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
- * never clean-up the state.
- * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
- * than than minTime. Set to 0 (zero) to never clean-up the state.
- */
- def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
- if (maxTime.toMilliseconds < minTime.toMilliseconds) {
- throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
- }
- minIdleStateRetentionTime = minTime.toMilliseconds
- maxIdleStateRetentionTime = maxTime.toMilliseconds
- this
- }
-
- def getMinIdleStateRetentionTime: Long = {
- minIdleStateRetentionTime
- }
-
- def getMaxIdleStateRetentionTime: Long = {
- maxIdleStateRetentionTime
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c594d4c..d68da04 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -81,7 +81,7 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
- def qConf: StreamQueryConfig = new StreamQueryConfig
+ def queryConfig: StreamQueryConfig = new StreamQueryConfig
/**
* Checks if the chosen table name is valid.
@@ -128,16 +128,16 @@ abstract class StreamTableEnvironment(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
*/
override private[flink] def writeToSink[T](
table: Table,
sink: TableSink[T],
- qConfig: QueryConfig): Unit = {
+ queryConfig: QueryConfig): Unit = {
// Check query configuration
- val sQConf = qConfig match {
+ val streamQueryConfig = queryConfig match {
case streamConfig: StreamQueryConfig => streamConfig
case _ =>
throw new TableException("StreamQueryConfig required to configure stream query.")
@@ -150,7 +150,11 @@ abstract class StreamTableEnvironment(
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
- translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+ translate(
+ table,
+ streamQueryConfig,
+ updatesAsRetraction = true,
+ withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
retractSink.asInstanceOf[RetractStreamTableSink[Any]]
.emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -176,7 +180,7 @@ abstract class StreamTableEnvironment(
translate(
optimizedPlan,
table.getRelNode.getRowType,
- sQConf,
+ streamQueryConfig,
withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
@@ -196,7 +200,7 @@ abstract class StreamTableEnvironment(
translate(
optimizedPlan,
table.getRelNode.getRowType,
- sQConf,
+ streamQueryConfig,
withChangeFlag = false)(outputType)
// Give the DataStream to the TableSink to emit it.
appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
@@ -566,7 +570,7 @@ abstract class StreamTableEnvironment(
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @param updatesAsRetraction Set to true to encode updates as retraction messages.
* @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
@@ -575,12 +579,12 @@ abstract class StreamTableEnvironment(
*/
protected def translate[A](
table: Table,
- qConfig: StreamQueryConfig,
+ queryConfig: StreamQueryConfig,
updatesAsRetraction: Boolean,
withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)
- translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag)
+ translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)
}
/**
@@ -589,7 +593,7 @@ abstract class StreamTableEnvironment(
* @param logicalPlan The root node of the relational expression tree.
* @param logicalType The row type of the result. Since the logicalPlan can lose the
* field naming during optimization we pass the row type separately.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
@@ -598,7 +602,7 @@ abstract class StreamTableEnvironment(
protected def translate[A](
logicalPlan: RelNode,
logicalType: RelDataType,
- qConfig: StreamQueryConfig,
+ queryConfig: StreamQueryConfig,
withChangeFlag: Boolean)
(implicit tpe: TypeInformation[A]): DataStream[A] = {
@@ -610,7 +614,7 @@ abstract class StreamTableEnvironment(
}
// get CRow plan
- val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig)
+ val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
// convert CRow to output type
val conversion = if (withChangeFlag) {
@@ -642,16 +646,16 @@ abstract class StreamTableEnvironment(
* Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
*
* @param logicalPlan The logical plan to translate.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @return The [[DataStream]] of type [[CRow]].
*/
protected def translateToCRow(
logicalPlan: RelNode,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
logicalPlan match {
case node: DataStreamRel =>
- node.translateToPlan(this, qConfig)
+ node.translateToPlan(this, queryConfig)
case _ =>
throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
@@ -667,7 +671,7 @@ abstract class StreamTableEnvironment(
def explain(table: Table): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast, updatesAsRetraction = false)
- val dataStream = translateToCRow(optimizedPlan, qConf)
+ val dataStream = translateToCRow(optimizedPlan, queryConfig)
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index c3b5951..311986c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -150,7 +150,7 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
- toDataStream(table, clazz, qConf)
+ toDataStream(table, clazz, queryConfig)
}
/**
@@ -170,7 +170,7 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
- toDataStream(table, typeInfo, qConf)
+ toDataStream(table, typeInfo, queryConfig)
}
/**
@@ -186,14 +186,17 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = {
+ def toDataStream[T](
+ table: Table,
+ clazz: Class[T],
+ queryConfig: StreamQueryConfig): DataStream[T] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
- translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
/**
@@ -209,16 +212,16 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
def toDataStream[T](
table: Table,
typeInfo: TypeInformation[T],
- qConfig: StreamQueryConfig): DataStream[T] = {
+ queryConfig: StreamQueryConfig): DataStream[T] = {
TableEnvironment.validateType(typeInfo)
- translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
/**
@@ -242,7 +245,7 @@ class StreamTableEnvironment(
table: Table,
clazz: Class[T]): DataStream[JTuple2[JBool, T]] = {
- toRetractStream(table, clazz, qConf)
+ toRetractStream(table, clazz, queryConfig)
}
/**
@@ -266,7 +269,7 @@ class StreamTableEnvironment(
table: Table,
typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = {
- toRetractStream(table, typeInfo, qConf)
+ toRetractStream(table, typeInfo, queryConfig)
}
/**
@@ -283,21 +286,21 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param clazz The class of the requested record type.
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the requested record type.
* @return The converted [[DataStream]].
*/
def toRetractStream[T](
table: Table,
clazz: Class[T],
- qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
+ queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
translate[JTuple2[JBool, T]](
table,
- qConfig,
+ queryConfig,
updatesAsRetraction = true,
withChangeFlag = true)(resultType)
}
@@ -316,14 +319,14 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] of the requested record type.
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the requested record type.
* @return The converted [[DataStream]].
*/
def toRetractStream[T](
table: Table,
typeInfo: TypeInformation[T],
- qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
+ queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
TableEnvironment.validateType(typeInfo)
val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
@@ -332,7 +335,7 @@ class StreamTableEnvironment(
)
translate[JTuple2[JBool, T]](
table,
- qConfig,
+ queryConfig,
updatesAsRetraction = true,
withChangeFlag = true)(resultTypeInfo)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
new file mode 100644
index 0000000..c8fbab7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.io.Serializable
+import org.apache.flink.api.common.time.Time
+
+class QueryConfig private[table] extends Serializable {}
+
+/**
+ * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
+ */
+class BatchQueryConfig private[table] extends QueryConfig
+
+/**
+ * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
+ *
+ * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.queryConfig]]
+ * method.
+ */
+class StreamQueryConfig private[table] extends QueryConfig {
+
+ /**
+ * The minimum time until state which was not updated will be retained.
+ * State might be cleared and removed if it was not updated for the defined period of time.
+ */
+ private var minIdleStateRetentionTime: Long = Long.MinValue
+
+ /**
+ * The maximum time until state which was not updated will be retained.
+ * State will be cleared and removed if it was not updated for the defined period of time.
+ */
+ private var maxIdleStateRetentionTime: Long = Long.MinValue
+
+ /**
+ * Specifies the time interval for how long idle state, i.e., state which was not updated, will
+ * be retained. When state was not updated for the specified interval of time, it will be cleared
+ * and removed.
+ *
+ * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ * was the first data. This can result in previous results being overwritten.
+ *
+ * Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
+ * maximum time for state to be retained. This method is more efficient, because the system has
+ * to do less bookkeeping to identify the time at which state must be cleared.
+ *
+ * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
+ * clean-up the state.
+ */
+ def withIdleStateRetentionTime(time: Time): StreamQueryConfig = {
+ withIdleStateRetentionTime(time, time)
+ }
+
+ /**
+ * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the minimum time and will never
+ * be kept if it was idle for more than the maximum time.
+ *
+ * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ * was the first data. This can result in previous results being overwritten.
+ *
+ * Set to 0 (zero) to never clean-up the state.
+ *
+ * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+ * never clean-up the state.
+ * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
+ * than than minTime. Set to 0 (zero) to never clean-up the state.
+ */
+ def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
+ if (maxTime.toMilliseconds < minTime.toMilliseconds) {
+ throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+ }
+ minIdleStateRetentionTime = minTime.toMilliseconds
+ maxIdleStateRetentionTime = maxTime.toMilliseconds
+ this
+ }
+
+ def getMinIdleStateRetentionTime: Long = {
+ minIdleStateRetentionTime
+ }
+
+ def getMaxIdleStateRetentionTime: Long = {
+ maxIdleStateRetentionTime
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 56f7d55..8c6b273 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -143,7 +143,7 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
- toDataStream(table, qConf)
+ toDataStream(table, queryConfig)
}
/**
@@ -158,14 +158,16 @@ class StreamTableEnvironment(
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = {
+ def toDataStream[T: TypeInformation](
+ table: Table,
+ queryConfig: StreamQueryConfig): DataStream[T] = {
val returnType = createTypeInformation[T]
- asScalaStream(
- translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+ asScalaStream(translate(
+ table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
}
/**
@@ -180,7 +182,7 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
- toRetractStream(table, qConf)
+ toRetractStream(table, queryConfig)
}
/**
@@ -191,16 +193,16 @@ class StreamTableEnvironment(
* A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
*
* @param table The [[Table]] to convert.
- * @param qConfig The configuration of the query to generate.
+ * @param queryConfig The configuration of the query to generate.
* @tparam T The type of the requested data type.
* @return The converted [[DataStream]].
*/
def toRetractStream[T: TypeInformation](
table: Table,
- qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+ queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
val returnType = createTypeInformation[(Boolean, T)]
asScalaStream(
- translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
+ translate(table, queryConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 966b42f..9874a9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -59,12 +59,12 @@ class TableConversions(table: Table) {
/** Converts the [[Table]] to a [[DataStream]] of the specified type.
*
- * @param qConfig The configuration for the generated query.
+ * @param queryConfig The configuration for the generated query.
*/
- def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = {
+ def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
table.tableEnv match {
case tEnv: ScalaStreamTableEnv =>
- tEnv.toDataStream(table, qConfig)
+ tEnv.toDataStream(table, queryConfig)
case _ =>
throw new TableException(
"Only tables that originate from Scala DataStreams " +
@@ -97,14 +97,15 @@ class TableConversions(table: Table) {
*
* A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
*
- * @param qConfig The configuration for the generated query.
+ * @param queryConfig The configuration for the generated query.
*
*/
- def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+ def toRetractStream[T: TypeInformation](
+ queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
table.tableEnv match {
case tEnv: ScalaStreamTableEnv =>
- tEnv.toRetractStream(table, qConfig)
+ tEnv.toRetractStream(table, queryConfig)
case _ =>
throw new TableException(
"Only tables that originate from Scala DataStreams " +
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 5a2eb1c..ca61c65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -764,13 +764,13 @@ class Table(
*/
def writeToSink[T](sink: TableSink[T]): Unit = {
- def qConfig = this.tableEnv match {
- case s: StreamTableEnvironment => s.qConf
+ def queryConfig = this.tableEnv match {
+ case s: StreamTableEnvironment => s.queryConfig
case b: BatchTableEnvironment => new BatchQueryConfig
case _ => null
}
- writeToSink(sink, qConfig)
+ writeToSink(sink, queryConfig)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 0e377b5..5f270f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -85,11 +85,12 @@ class DataStreamCalc(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
- val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val inputDataStream =
+ getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
val generator = new CodeGenerator(config, false, inputRowType)
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index cbd818a..5b32b10 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -84,12 +84,12 @@ class DataStreamCorrelate(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
// we do not need to specify input type
- val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index f01b24a..e5d8088 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -106,16 +106,16 @@ class DataStreamGroupAggregate(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
- if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) {
+ if (groupings.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
LOG.warn(
"No state retention interval configured for a query which accumulates state. " +
"Please provide a query configuration with valid retention interval to prevent excessive " +
- "state size. You may specify a retention time of 0 to not clean up the state.")
+ "state size. You may specify a retention time of 0 to not clean up the state.")
}
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
new CalcitePair[AggregateCall, String](
@@ -149,7 +149,7 @@ class DataStreamGroupAggregate(
inputSchema.logicalType,
inputSchema.physicalFieldTypeInfo,
groupings,
- qConfig,
+ queryConfig,
DataStreamRetractionRules.isAccRetract(this),
DataStreamRetractionRules.isAccRetract(getInput))
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index d2aaad0..2a71592 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -109,9 +109,9 @@ class DataStreamGroupWindowAggregate(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
new CalcitePair[AggregateCall, String](
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 8e97884..a9fbf02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.slf4j.LoggerFactory
class DataStreamOverAggregate(
logicWindow: Window,
@@ -47,6 +48,7 @@ class DataStreamOverAggregate(
extends SingleRel(cluster, traitSet, inputNode)
with OverAggregate
with DataStreamRel {
+ private val LOG = LoggerFactory.getLogger(this.getClass)
override def deriveRowType(): RelDataType = schema.logicalType
@@ -90,7 +92,7 @@ class DataStreamOverAggregate(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
if (logicWindow.groups.size > 1) {
throw new TableException(
@@ -112,10 +114,23 @@ class DataStreamOverAggregate(
"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
}
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+ if (consumeRetraction) {
+ throw new TableException(
+ "Retraction on Over window aggregation is not supported yet. " +
+ "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+ }
+
+ if (!logicWindow.groups.get(0).keys.isEmpty && queryConfig.getMinIdleStateRetentionTime < 0) {
+ LOG.warn(
+ "No state retention interval configured for a query which accumulates state. " +
+ "Please provide a query configuration with valid retention interval to prevent " +
+ "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+ }
+
val generator = new CodeGenerator(
tableEnv.getConfig,
false,
@@ -126,18 +141,13 @@ class DataStreamOverAggregate(
.get(orderKey.getFieldIndex)
.getType
- if (consumeRetraction) {
- throw new TableException(
- "Retraction on Over window aggregation is not supported yet. " +
- "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
- }
-
timeType match {
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
// proc-time OVER window
if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
// unbounded OVER window
createUnboundedAndCurrentRowOverWindow(
+ queryConfig,
generator,
inputDS,
isRowTimeType = false,
@@ -145,8 +155,10 @@ class DataStreamOverAggregate(
} else if (
overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
overWindow.upperBound.isCurrentRow) {
+
// bounded OVER window
createBoundedAndCurrentRowOverWindow(
+ queryConfig,
generator,
inputDS,
isRowTimeType = false,
@@ -162,6 +174,7 @@ class DataStreamOverAggregate(
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
// unbounded OVER window
createUnboundedAndCurrentRowOverWindow(
+ queryConfig,
generator,
inputDS,
isRowTimeType = true,
@@ -169,6 +182,7 @@ class DataStreamOverAggregate(
} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
createBoundedAndCurrentRowOverWindow(
+ queryConfig,
generator,
inputDS,
isRowTimeType = true,
@@ -185,6 +199,7 @@ class DataStreamOverAggregate(
}
def createUnboundedAndCurrentRowOverWindow(
+ queryConfig: StreamQueryConfig,
generator: CodeGenerator,
inputDS: DataStream[CRow],
isRowTimeType: Boolean,
@@ -210,6 +225,7 @@ class DataStreamOverAggregate(
inputSchema.physicalType,
inputSchema.physicalTypeInfo,
inputSchema.physicalFieldTypeInfo,
+ queryConfig,
isRowTimeType,
partitionKeys.nonEmpty,
isRowsClause)
@@ -242,6 +258,7 @@ class DataStreamOverAggregate(
}
def createBoundedAndCurrentRowOverWindow(
+ queryConfig: StreamQueryConfig,
generator: CodeGenerator,
inputDS: DataStream[CRow],
isRowTimeType: Boolean,
@@ -269,6 +286,7 @@ class DataStreamOverAggregate(
inputSchema.physicalTypeInfo,
inputSchema.physicalFieldTypeInfo,
precedingOffset,
+ queryConfig,
isRowsClause,
isRowTimeType
)
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 6f6edf7..65d336f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -29,12 +29,12 @@ trait DataStreamRel extends FlinkRelNode {
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
- * @param qConfig The configuration for the query to generate.
+ * @param queryConfig The configuration for the query to generate.
* @return DataStream of type [[CRow]]
*/
def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow]
+ queryConfig: StreamQueryConfig): DataStream[CRow]
/**
* Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index e64bf0f..424c6a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -56,7 +56,7 @@ class DataStreamScan(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 6cc7396..6f4980a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -60,10 +60,10 @@ class DataStreamUnion(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
- val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
- val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+ val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
leftDataSet.union(rightDataSet)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index ba6b025..d7c490f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -58,7 +58,7 @@ class DataStreamValues(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 225f23f..51e609f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -100,7 +100,7 @@ class StreamTableSourceScan(
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- qConfig: StreamQueryConfig): DataStream[CRow] = {
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 27392c7..8073959 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -77,6 +77,7 @@ object AggregateUtil {
inputType: RelDataType,
inputTypeInfo: TypeInformation[Row],
inputFieldTypeInfo: Seq[TypeInformation[_]],
+ queryConfig: StreamQueryConfig,
isRowTimeType: Boolean,
isPartitioned: Boolean,
isRowsClause: Boolean)
@@ -117,23 +118,27 @@ object AggregateUtil {
new RowTimeUnboundedRowsOver(
genFunction,
aggregationStateType,
- CRowTypeInfo(inputTypeInfo))
+ CRowTypeInfo(inputTypeInfo),
+ queryConfig)
} else {
// RANGE unbounded over process function
new RowTimeUnboundedRangeOver(
genFunction,
aggregationStateType,
- CRowTypeInfo(inputTypeInfo))
+ CRowTypeInfo(inputTypeInfo),
+ queryConfig)
}
} else {
if (isPartitioned) {
new ProcTimeUnboundedPartitionedOver(
genFunction,
- aggregationStateType)
+ aggregationStateType,
+ queryConfig)
} else {
new ProcTimeUnboundedNonPartitionedOver(
genFunction,
- aggregationStateType)
+ aggregationStateType,
+ queryConfig)
}
}
}
@@ -155,7 +160,7 @@ object AggregateUtil {
inputRowType: RelDataType,
inputFieldTypes: Seq[TypeInformation[_]],
groupings: Array[Int],
- qConfig: StreamQueryConfig,
+ queryConfig: StreamQueryConfig,
generateRetraction: Boolean,
consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
@@ -192,7 +197,7 @@ object AggregateUtil {
genFunction,
aggregationStateType,
generateRetraction,
- qConfig)
+ queryConfig)
}
@@ -217,6 +222,7 @@ object AggregateUtil {
inputTypeInfo: TypeInformation[Row],
inputFieldTypeInfo: Seq[TypeInformation[_]],
precedingOffset: Long,
+ queryConfig: StreamQueryConfig,
isRowsClause: Boolean,
isRowTimeType: Boolean)
: ProcessFunction[CRow, CRow] = {
@@ -258,15 +264,15 @@ object AggregateUtil {
genFunction,
aggregationStateType,
inputRowType,
- precedingOffset
- )
+ precedingOffset,
+ queryConfig)
} else {
new RowTimeBoundedRangeOver(
genFunction,
aggregationStateType,
inputRowType,
- precedingOffset
- )
+ precedingOffset,
+ queryConfig)
}
} else {
if (isRowsClause) {
@@ -274,13 +280,15 @@ object AggregateUtil {
genFunction,
precedingOffset,
aggregationStateType,
- inputRowType)
+ inputRowType,
+ queryConfig)
} else {
new ProcTimeBoundedRangeOver(
genFunction,
precedingOffset,
aggregationStateType,
- inputRowType)
+ inputRowType,
+ queryConfig)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 84fee87..57ea86e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -41,19 +41,13 @@ class GroupAggProcessFunction(
private val genAggregations: GeneratedAggregationsFunction,
private val aggregationStateType: RowTypeInfo,
private val generateRetraction: Boolean,
- private val qConfig: StreamQueryConfig)
- extends ProcessFunction[CRow, CRow]
+ private val queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
private var function: GeneratedAggregations = _
- private val minRetentionTime = qConfig.getMinIdleStateRetentionTime
- private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
- private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
- // interval in which clean-up timers are registered
- private val cleanupTimerInterval = maxRetentionTime - minRetentionTime
-
private var newRow: CRow = _
private var prevRow: CRow = _
private var firstRow: Boolean = _
@@ -61,8 +55,6 @@ class GroupAggProcessFunction(
private var state: ValueState[Row] = _
// counts the number of added and retracted input records
private var cntState: ValueState[JLong] = _
- // holds the latest registered cleanup timer
- private var cleanupTimeState: ValueState[JLong] = _
override def open(config: Configuration) {
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -84,11 +76,7 @@ class GroupAggProcessFunction(
new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
cntState = getRuntimeContext.getState(inputCntDescriptor)
- if (stateCleaningEnabled) {
- val inputCntDescriptor: ValueStateDescriptor[JLong] =
- new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG)
- cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
- }
+ initCleanupTimeState("GroupAggregateCleanupTime")
}
override def processElement(
@@ -96,22 +84,9 @@ class GroupAggProcessFunction(
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
- if (stateCleaningEnabled) {
-
- val currentTime = ctx.timerService().currentProcessingTime()
- val earliestCleanup = currentTime + minRetentionTime
-
- // last registered timer
- val lastCleanupTime = cleanupTimeState.value()
-
- if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
- // we need to register a new timer
- val cleanupTime = earliestCleanup + cleanupTimerInterval
- // register timer and remember clean-up time
- ctx.timerService().registerProcessingTimeTimer(cleanupTime)
- cleanupTimeState.update(cleanupTime)
- }
- }
+ val currentTime = ctx.timerService().currentProcessingTime()
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime)
val input = inputC.row
@@ -182,11 +157,8 @@ class GroupAggProcessFunction(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
- if (timestamp == cleanupTimeState.value()) {
- // clear all state
- this.state.clear()
- this.cntState.clear()
- this.cleanupTimeState.clear()
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state, cntState)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 3fb506f..d50912c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import java.util.{ArrayList, List => JList}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
@@ -48,9 +49,11 @@ class ProcTimeBoundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
precedingTimeBoundary: Long,
aggregatesTypeInfo: RowTypeInfo,
- inputType: TypeInformation[CRow])
- extends ProcessFunction[CRow, CRow]
+ inputType: TypeInformation[CRow],
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
+
private var output: CRow = _
private var accumulatorState: ValueState[Row] = _
private var rowMapState: MapState[Long, JList[Row]] = _
@@ -81,6 +84,8 @@ class ProcTimeBoundedRangeOver(
val stateDescriptor: ValueStateDescriptor[Row] =
new ValueStateDescriptor[Row]("overState", aggregatesTypeInfo)
accumulatorState = getRuntimeContext.getState(stateDescriptor)
+
+ initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime")
}
override def processElement(
@@ -89,6 +94,9 @@ class ProcTimeBoundedRangeOver(
out: Collector[CRow]): Unit = {
val currentTime = ctx.timerService.currentProcessingTime
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime)
+
// buffer the event incoming event
// add current element to the window list of elements with corresponding timestamp
@@ -109,7 +117,15 @@ class ProcTimeBoundedRangeOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
- // we consider the original timestamp of events that have registered this time trigger 1 ms ago
+ if (needToCleanupState(timestamp)) {
+ // clean up and return
+ cleanupState(rowMapState, accumulatorState)
+ return
+ }
+
+ // we consider the original timestamp of events
+ // that have registered this time trigger 1 ms ago
+
val currentTime = timestamp - 1
var i = 0
@@ -153,7 +169,8 @@ class ProcTimeBoundedRangeOver(
// get the list of elements of current proctime
val currentElements = rowMapState.get(currentTime)
- // add current elements to aggregator. Multiple elements might have arrived in the same proctime
+ // add current elements to aggregator. Multiple elements might
+ // have arrived in the same proctime
// the same accumulator value will be computed for all elements
var iElemenets = 0
while (iElemenets < currentElements.size()) {
@@ -178,7 +195,6 @@ class ProcTimeBoundedRangeOver(
// update the value of accumulators for future incremental computation
accumulatorState.update(accumulators)
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index 0c7f44e..e388c93 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import java.util.{List => JList}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
@@ -49,8 +50,9 @@ class ProcTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
precedingOffset: Long,
aggregatesTypeInfo: RowTypeInfo,
- inputType: TypeInformation[CRow])
- extends ProcessFunction[CRow, CRow]
+ inputType: TypeInformation[CRow],
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
Preconditions.checkArgument(precedingOffset > 0)
@@ -99,6 +101,8 @@ class ProcTimeBoundedRowsOver(
val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
+
+ initCleanupTimeState("ProcTimeBoundedRowsOverCleanupTime")
}
override def processElement(
@@ -110,6 +114,9 @@ class ProcTimeBoundedRowsOver(
val currentTime = ctx.timerService.currentProcessingTime
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime)
+
// initialize state for the processed element
var accumulators = accumulatorState.value
if (accumulators == null) {
@@ -180,4 +187,13 @@ class ProcTimeBoundedRowsOver(
out.collect(output)
}
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(rowMapState, accumulatorState, counterState, smallestTsState)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 8a23132..2a6c9c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.util.Collector
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -37,8 +38,9 @@ import org.slf4j.LoggerFactory
*/
class ProcTimeUnboundedNonPartitionedOver(
genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo)
- extends ProcessFunction[CRow, CRow]
+ aggregationStateType: RowTypeInfo,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with CheckpointedFunction
with Compiler[GeneratedAggregations] {
@@ -68,12 +70,16 @@ class ProcTimeUnboundedNonPartitionedOver(
accumulators = function.createAccumulators()
}
}
+ initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
}
override def processElement(
inputC: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
val input = inputC.row
@@ -85,6 +91,16 @@ class ProcTimeUnboundedNonPartitionedOver(
out.collect(output)
}
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
+
override def snapshotState(context: FunctionSnapshotContext): Unit = {
state.clear()
if (null != accumulators) {
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
index 847c1bf..97f0ad7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -24,6 +24,7 @@ import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.CRow
import org.slf4j.LoggerFactory
@@ -36,8 +37,9 @@ import org.slf4j.LoggerFactory
*/
class ProcTimeUnboundedPartitionedOver(
genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo)
- extends ProcessFunction[CRow, CRow]
+ aggregationStateType: RowTypeInfo,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
private var output: CRow = _
@@ -59,6 +61,8 @@ class ProcTimeUnboundedPartitionedOver(
val stateDescriptor: ValueStateDescriptor[Row] =
new ValueStateDescriptor[Row]("overState", aggregationStateType)
state = getRuntimeContext.getState(stateDescriptor)
+
+ initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
}
override def processElement(
@@ -66,6 +70,9 @@ class ProcTimeUnboundedPartitionedOver(
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
val input = inputC.row
var accumulators = state.value()
@@ -83,4 +90,13 @@ class ProcTimeUnboundedPartitionedOver(
out.collect(output)
}
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
new file mode 100644
index 0000000..292fd3b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryConfig)
+ extends ProcessFunction[IN, OUT]{
+
+ protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+ // holds the latest registered cleanup timer
+ private var cleanupTimeState: ValueState[JLong] = _
+
+ protected def initCleanupTimeState(stateName: String) {
+ if (stateCleaningEnabled) {
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong](stateName, Types.LONG)
+ cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+ }
+ }
+
+ protected def registerProcessingCleanupTimer(
+ ctx: ProcessFunction[IN, OUT]#Context,
+ currentTime: Long): Unit = {
+ if (stateCleaningEnabled) {
+
+ // last registered timer
+ val curCleanupTime = cleanupTimeState.value()
+
+ // check if a cleanup timer is registered and
+ // that the current cleanup timer won't delete state we need to keep
+ if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+ // we need to register a new (later) timer
+ val cleanupTime = currentTime + maxRetentionTime
+ // register timer and remember clean-up time
+ ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+ cleanupTimeState.update(cleanupTime)
+ }
+ }
+ }
+
+ protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
+ ctx.timeDomain() == TimeDomain.PROCESSING_TIME
+ }
+
+ protected def needToCleanupState(timestamp: Long): Boolean = {
+ if (stateCleaningEnabled) {
+ val cleanupTime = cleanupTimeState.value()
+ // check that the triggered timer is the last registered processing time timer.
+ null != cleanupTime && timestamp == cleanupTime
+ } else {
+ false
+ }
+ }
+
+ protected def cleanupState(states: State*): Unit = {
+ // clear all state
+ states.foreach(_.clear())
+ this.cleanupTimeState.clear()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 4020d44..65edf6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.types.Row
@@ -42,8 +43,9 @@ class RowTimeBoundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
inputRowType: CRowTypeInfo,
- precedingOffset: Long)
- extends ProcessFunction[CRow, CRow]
+ precedingOffset: Long,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
Preconditions.checkNotNull(aggregationStateType)
Preconditions.checkNotNull(precedingOffset)
@@ -97,6 +99,8 @@ class RowTimeBoundedRangeOver(
valueTypeInformation)
dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime")
}
override def processElement(
@@ -106,6 +110,9 @@ class RowTimeBoundedRangeOver(
val input = inputC.row
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
// triggering timestamp for trigger calculation
val triggeringTs = ctx.timestamp
@@ -131,6 +138,34 @@ class RowTimeBoundedRangeOver(
timestamp: Long,
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
+
+ if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+ if (needToCleanupState(timestamp)) {
+
+ val keysIt = dataState.keys.iterator()
+ val lastProcessedTime = lastTriggeringTsState.value
+
+ // is data left which has not been processed yet?
+ var noRecordsToProcess = true
+ while (keysIt.hasNext && noRecordsToProcess) {
+ if (keysIt.next() > lastProcessedTime) {
+ noRecordsToProcess = false
+ }
+ }
+
+ if (noRecordsToProcess) {
+ // we clean the state
+ cleanupState(dataState, accumulatorState, lastTriggeringTsState)
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+ }
+ }
+ return
+ }
+
// gets all window data from state for the calculation
val inputs: JList[Row] = dataState.get(timestamp)
@@ -196,8 +231,11 @@ class RowTimeBoundedRangeOver(
// update state
accumulatorState.update(accumulators)
- lastTriggeringTsState.update(timestamp)
}
+ lastTriggeringTsState.update(timestamp)
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 5ec6ec7..395ae39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.types.Row
import org.apache.flink.util.{Collector, Preconditions}
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -43,8 +44,9 @@ class RowTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
inputRowType: CRowTypeInfo,
- precedingOffset: Long)
- extends ProcessFunction[CRow, CRow]
+ precedingOffset: Long,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
Preconditions.checkNotNull(aggregationStateType)
@@ -106,6 +108,8 @@ class RowTimeBoundedRowsOver(
valueTypeInformation)
dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ initCleanupTimeState("RowTimeBoundedRowsOverCleanupTime")
}
override def processElement(
@@ -115,6 +119,9 @@ class RowTimeBoundedRowsOver(
val input = inputC.row
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
// triggering timestamp for trigger calculation
val triggeringTs = ctx.timestamp
@@ -141,6 +148,33 @@ class RowTimeBoundedRowsOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
+ if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+ if (needToCleanupState(timestamp)) {
+
+ val keysIt = dataState.keys.iterator()
+ val lastProcessedTime = lastTriggeringTsState.value
+
+ // is data left which has not been processed yet?
+ var noRecordsToProcess = true
+ while (keysIt.hasNext && noRecordsToProcess) {
+ if (keysIt.next() > lastProcessedTime) {
+ noRecordsToProcess = false
+ }
+ }
+
+ if (noRecordsToProcess) {
+ // We clean the state
+ cleanupState(dataState, accumulatorState, dataCountState, lastTriggeringTsState)
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+ }
+ }
+ return
+ }
+
// gets all window data from state for the calculation
val inputs: JList[Row] = dataState.get(timestamp)
@@ -220,6 +254,9 @@ class RowTimeBoundedRowsOver(
}
lastTriggeringTsState.update(timestamp)
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 3e2a811..741d2b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -28,6 +28,7 @@ import org.apache.flink.util.{Collector, Preconditions}
import org.apache.flink.api.common.state._
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
@@ -43,8 +44,9 @@ import org.slf4j.LoggerFactory
abstract class RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[CRow])
- extends ProcessFunction[CRow, CRow]
+ inputType: TypeInformation[CRow],
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations] {
protected var output: CRow = _
@@ -83,6 +85,8 @@ abstract class RowTimeUnboundedOver(
new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ initCleanupTimeState("RowTimeUnboundedOverCleanupTime")
}
/**
@@ -101,6 +105,9 @@ abstract class RowTimeUnboundedOver(
val input = inputC.row
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
val timestamp = ctx.timestamp()
val curWatermark = ctx.timerService().currentWatermark()
@@ -133,6 +140,24 @@ abstract class RowTimeUnboundedOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
+ if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+ if (needToCleanupState(timestamp)) {
+
+ // we check whether there are still records which have not been processed yet
+ val noRecordsToProcess = !rowMapState.keys.iterator().hasNext
+ if (noRecordsToProcess) {
+ // we clean the state
+ cleanupState(rowMapState, accumulatorState)
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+ }
+ }
+ return
+ }
+
Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
val collector = out.asInstanceOf[TimestampedCollector[CRow]]
@@ -178,6 +203,9 @@ abstract class RowTimeUnboundedOver(
ctx.timerService.registerEventTimeTimer(curWatermark + 1)
}
}
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
}
/**
@@ -221,11 +249,13 @@ abstract class RowTimeUnboundedOver(
class RowTimeUnboundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[CRow])
+ inputType: TypeInformation[CRow],
+ queryConfig: StreamQueryConfig)
extends RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType,
- inputType) {
+ inputType,
+ queryConfig) {
override def processElementsWithSameTimestamp(
curRowList: JList[Row],
@@ -259,11 +289,13 @@ class RowTimeUnboundedRowsOver(
class RowTimeUnboundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[CRow])
+ inputType: TypeInformation[CRow],
+ queryConfig: StreamQueryConfig)
extends RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType,
- inputType) {
+ inputType,
+ queryConfig) {
override def processElementsWithSameTimestamp(
curRowList: JList[Row],