You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:04 UTC

[3/5] flink git commit: [FLINK-6491] [table] Add QueryConfig and state clean up for over-windowed aggregates.

[FLINK-6491] [table] Add QueryConfig and state clean up for over-windowed aggregates.

This closes #3863.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24808871
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24808871
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24808871

Branch: refs/heads/master
Commit: 2480887180d881c30d228a73c746f94abbcbbb64
Parents: d16339d
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue May 9 14:36:42 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 11 23:53:25 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |   8 +-
 .../apache/flink/table/api/QueryConfig.scala    | 102 -----
 .../table/api/StreamTableEnvironment.scala      |  38 +-
 .../table/api/java/StreamTableEnvironment.scala |  35 +-
 .../apache/flink/table/api/queryConfig.scala    | 102 +++++
 .../api/scala/StreamTableEnvironment.scala      |  20 +-
 .../table/api/scala/TableConversions.scala      |  13 +-
 .../org/apache/flink/table/api/table.scala      |   6 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |   5 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |   4 +-
 .../datastream/DataStreamGroupAggregate.scala   |  10 +-
 .../DataStreamGroupWindowAggregate.scala        |   4 +-
 .../datastream/DataStreamOverAggregate.scala    |  34 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |   4 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   2 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   6 +-
 .../nodes/datastream/DataStreamValues.scala     |   2 +-
 .../datastream/StreamTableSourceScan.scala      |   2 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  32 +-
 .../aggregate/GroupAggProcessFunction.scala     |  44 +-
 .../aggregate/ProcTimeBoundedRangeOver.scala    |  26 +-
 .../aggregate/ProcTimeBoundedRowsOver.scala     |  20 +-
 .../ProcTimeUnboundedNonPartitionedOver.scala   |  20 +-
 .../ProcTimeUnboundedPartitionedOver.scala      |  20 +-
 .../ProcessFunctionWithCleanupState.scala       |  85 ++++
 .../aggregate/RowTimeBoundedRangeOver.scala     |  44 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |  41 +-
 .../aggregate/RowTimeUnboundedOver.scala        |  44 +-
 .../stream/table/GroupAggregationsITCase.scala  |  13 +-
 ...ProcessingOverRangeProcessFunctionTest.scala | 336 --------------
 .../table/runtime/harness/HarnessTestBase.scala | 281 +++++++++++-
 .../runtime/harness/NonWindowHarnessTest.scala  | 157 +++++++
 .../runtime/harness/OverWindowHarnessTest.scala | 458 ++++++++++---------
 .../table/utils/MockTableEnvironment.scala      |   2 +-
 34 files changed, 1209 insertions(+), 811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index f33c187..3c0f51b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -113,17 +113,17 @@ abstract class BatchTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param qConfig The configuration for the query to generate.
+    * @param queryConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
     */
   override private[flink] def writeToSink[T](
       table: Table,
       sink: TableSink[T],
-      qConfig: QueryConfig): Unit = {
+      queryConfig: QueryConfig): Unit = {
 
     // We do not pass the configuration on, because there is nothing to configure for batch queries.
-    val bQConfig = qConfig match {
-      case batchConfig: BatchQueryConfig => batchConfig
+    queryConfig match {
+      case _: BatchQueryConfig =>
       case _ =>
         throw new TableException("BatchQueryConfig required to configure batch query.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
deleted file mode 100644
index 8e8b5ac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api
-
-import _root_.java.io.Serializable
-import org.apache.flink.api.common.time.Time
-
-class QueryConfig private[table] extends Serializable {}
-
-/**
-  * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
-  */
-class BatchQueryConfig private[table] extends QueryConfig
-
-/**
-  * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
-  *
-  * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]]
-  * method.
-  */
-class StreamQueryConfig private[table] extends QueryConfig {
-
-  /**
-    * The minimum time until state which was not updated will be retained.
-    * State might be cleared and removed if it was not updated for the defined period of time.
-    */
-  private var minIdleStateRetentionTime: Long = Long.MinValue
-
-  /**
-    * The maximum time until state which was not updated will be retained.
-    * State will be cleared and removed if it was not updated for the defined period of time.
-    */
-  private var maxIdleStateRetentionTime: Long = Long.MinValue
-
-  /**
-    * Specifies the time interval for how long idle state, i.e., state which was not updated, will
-    * be retained. When state was not updated for the specified interval of time, it will be cleared
-    * and removed.
-    *
-    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
-    * was the first data. This can result in previous results being overwritten.
-    *
-    * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
-    * maximum time for state to be retained. This method is more efficient, because the system has
-    * to do less bookkeeping to identify the time at which state must be cleared.
-    *
-    * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
-    *             clean-up the state.
-    */
-  def setIdleStateRetentionTime(time: Time): StreamQueryConfig = {
-    setIdleStateRetentionTime(time, time)
-  }
-
-  /**
-    * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
-    * was not updated, will be retained.
-    * State will never be cleared until it was idle for less than the minimum time and will never
-    * be kept if it was idle for more than the maximum time.
-    *
-    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
-    * was the first data. This can result in previous results being overwritten.
-    *
-    * Set to 0 (zero) to never clean-up the state.
-    *
-    * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
-    *                never clean-up the state.
-    * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
-    *                than than minTime. Set to 0 (zero) to never clean-up the state.
-    */
-  def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
-    if (maxTime.toMilliseconds < minTime.toMilliseconds) {
-      throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
-    }
-    minIdleStateRetentionTime = minTime.toMilliseconds
-    maxIdleStateRetentionTime = maxTime.toMilliseconds
-    this
-  }
-
-  def getMinIdleStateRetentionTime: Long = {
-    minIdleStateRetentionTime
-  }
-
-  def getMaxIdleStateRetentionTime: Long = {
-    maxIdleStateRetentionTime
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c594d4c..d68da04 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -81,7 +81,7 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
-  def qConf: StreamQueryConfig = new StreamQueryConfig
+  def queryConfig: StreamQueryConfig = new StreamQueryConfig
 
   /**
     * Checks if the chosen table name is valid.
@@ -128,16 +128,16 @@ abstract class StreamTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param qConfig The configuration for the query to generate.
+    * @param queryConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
     */
   override private[flink] def writeToSink[T](
       table: Table,
       sink: TableSink[T],
-      qConfig: QueryConfig): Unit = {
+      queryConfig: QueryConfig): Unit = {
 
     // Check query configuration
-    val sQConf = qConfig match {
+    val streamQueryConfig = queryConfig match {
       case streamConfig: StreamQueryConfig => streamConfig
       case _ =>
         throw new TableException("StreamQueryConfig required to configure stream query.")
@@ -150,7 +150,11 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the TableSink expects.
         val result: DataStream[T] =
-          translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+          translate(
+            table,
+            streamQueryConfig,
+            updatesAsRetraction = true,
+            withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         retractSink.asInstanceOf[RetractStreamTableSink[Any]]
           .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -176,7 +180,7 @@ abstract class StreamTableEnvironment(
           translate(
             optimizedPlan,
             table.getRelNode.getRowType,
-            sQConf,
+            streamQueryConfig,
             withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
@@ -196,7 +200,7 @@ abstract class StreamTableEnvironment(
           translate(
             optimizedPlan,
             table.getRelNode.getRowType,
-            sQConf,
+            streamQueryConfig,
             withChangeFlag = false)(outputType)
         // Give the DataStream to the TableSink to emit it.
         appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
@@ -566,7 +570,7 @@ abstract class StreamTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
     *
     * @param table The root node of the relational expression tree.
-    * @param qConfig The configuration for the query to generate.
+    * @param queryConfig The configuration for the query to generate.
     * @param updatesAsRetraction Set to true to encode updates as retraction messages.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
@@ -575,12 +579,12 @@ abstract class StreamTableEnvironment(
     */
   protected def translate[A](
       table: Table,
-      qConfig: StreamQueryConfig,
+      queryConfig: StreamQueryConfig,
       updatesAsRetraction: Boolean,
       withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
-    translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag)
+    translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)
   }
 
   /**
@@ -589,7 +593,7 @@ abstract class StreamTableEnvironment(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can lose the
     *                    field naming during optimization we pass the row type separately.
-    * @param qConfig     The configuration for the query to generate.
+    * @param queryConfig     The configuration for the query to generate.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
@@ -598,7 +602,7 @@ abstract class StreamTableEnvironment(
   protected def translate[A](
       logicalPlan: RelNode,
       logicalType: RelDataType,
-      qConfig: StreamQueryConfig,
+      queryConfig: StreamQueryConfig,
       withChangeFlag: Boolean)
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
@@ -610,7 +614,7 @@ abstract class StreamTableEnvironment(
     }
 
     // get CRow plan
-    val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig)
+    val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
     // convert CRow to output type
     val conversion = if (withChangeFlag) {
@@ -642,16 +646,16 @@ abstract class StreamTableEnvironment(
     * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
     *
     * @param logicalPlan The logical plan to translate.
-    * @param qConfig The configuration for the query to generate.
+    * @param queryConfig  The configuration for the query to generate.
     * @return The [[DataStream]] of type [[CRow]].
     */
   protected def translateToCRow(
     logicalPlan: RelNode,
-    qConfig: StreamQueryConfig): DataStream[CRow] = {
+    queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     logicalPlan match {
       case node: DataStreamRel =>
-        node.translateToPlan(this, qConfig)
+        node.translateToPlan(this, queryConfig)
       case _ =>
         throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
           "This is a bug and should not happen. Please file an issue.")
@@ -667,7 +671,7 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast, updatesAsRetraction = false)
-    val dataStream = translateToCRow(optimizedPlan, qConf)
+    val dataStream = translateToCRow(optimizedPlan, queryConfig)
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index c3b5951..311986c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -150,7 +150,7 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    toDataStream(table, clazz, qConf)
+    toDataStream(table, clazz, queryConfig)
   }
 
   /**
@@ -170,7 +170,7 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
-    toDataStream(table, typeInfo, qConf)
+    toDataStream(table, typeInfo, queryConfig)
   }
 
   /**
@@ -186,14 +186,17 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the type of the resulting [[DataStream]].
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = {
+  def toDataStream[T](
+      table: Table,
+      clazz: Class[T],
+      queryConfig: StreamQueryConfig): DataStream[T] = {
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+    translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
   /**
@@ -209,16 +212,16 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](
       table: Table,
       typeInfo: TypeInformation[T],
-      qConfig: StreamQueryConfig): DataStream[T] = {
+      queryConfig: StreamQueryConfig): DataStream[T] = {
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+    translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
   /**
@@ -242,7 +245,7 @@ class StreamTableEnvironment(
       table: Table,
       clazz: Class[T]): DataStream[JTuple2[JBool, T]] = {
 
-    toRetractStream(table, clazz, qConf)
+    toRetractStream(table, clazz, queryConfig)
   }
 
   /**
@@ -266,7 +269,7 @@ class StreamTableEnvironment(
       table: Table,
       typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = {
 
-    toRetractStream(table, typeInfo, qConf)
+    toRetractStream(table, typeInfo, queryConfig)
   }
 
   /**
@@ -283,21 +286,21 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the requested record type.
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
   def toRetractStream[T](
       table: Table,
       clazz: Class[T],
-      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
+      queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
     val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
     translate[JTuple2[JBool, T]](
       table,
-      qConfig,
+      queryConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultType)
   }
@@ -316,14 +319,14 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] of the requested record type.
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
   def toRetractStream[T](
       table: Table,
       typeInfo: TypeInformation[T],
-      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
+      queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     TableEnvironment.validateType(typeInfo)
     val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
@@ -332,7 +335,7 @@ class StreamTableEnvironment(
     )
     translate[JTuple2[JBool, T]](
       table,
-      qConfig,
+      queryConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultTypeInfo)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
new file mode 100644
index 0000000..c8fbab7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.io.Serializable
+import org.apache.flink.api.common.time.Time
+
+class QueryConfig private[table] extends Serializable {}
+
+/**
+  * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
+  */
+class BatchQueryConfig private[table] extends QueryConfig
+
+/**
+  * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
+  *
+  * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.queryConfig]]
+  * method.
+  */
+class StreamQueryConfig private[table] extends QueryConfig {
+
+  /**
+    * The minimum time until state which was not updated will be retained.
+    * State might be cleared and removed if it was not updated for the defined period of time.
+    */
+  private var minIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * The maximum time until state which was not updated will be retained.
+    * State will be cleared and removed if it was not updated for the defined period of time.
+    */
+  private var maxIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * Specifies the time interval for how long idle state, i.e., state which was not updated, will
+    * be retained. When state was not updated for the specified interval of time, it will be cleared
+    * and removed.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+    * was the first data. This can result in previous results being overwritten.
+    *
+    * Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
+    * maximum time for state to be retained. This method is more efficient, because the system has
+    * to do less bookkeeping to identify the time at which state must be cleared.
+    *
+    * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
+    *             clean-up the state.
+    */
+  def withIdleStateRetentionTime(time: Time): StreamQueryConfig = {
+    withIdleStateRetentionTime(time, time)
+  }
+
+  /**
+    * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+    * was not updated, will be retained.
+    * State will never be cleared until it was idle for less than the minimum time and will never
+    * be kept if it was idle for more than the maximum time.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+    * was the first data. This can result in previous results being overwritten.
+    *
+    * Set to 0 (zero) to never clean-up the state.
+    *
+    * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+    *                never clean-up the state.
+    * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
+    *                than than minTime. Set to 0 (zero) to never clean-up the state.
+    */
+  def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
+    if (maxTime.toMilliseconds < minTime.toMilliseconds) {
+      throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+    }
+    minIdleStateRetentionTime = minTime.toMilliseconds
+    maxIdleStateRetentionTime = maxTime.toMilliseconds
+    this
+  }
+
+  def getMinIdleStateRetentionTime: Long = {
+    minIdleStateRetentionTime
+  }
+
+  def getMaxIdleStateRetentionTime: Long = {
+    maxIdleStateRetentionTime
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 56f7d55..8c6b273 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -143,7 +143,7 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
-    toDataStream(table, qConf)
+    toDataStream(table, queryConfig)
   }
 
   /**
@@ -158,14 +158,16 @@ class StreamTableEnvironment(
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
     * @param table The [[Table]] to convert.
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = {
+  def toDataStream[T: TypeInformation](
+    table: Table,
+    queryConfig: StreamQueryConfig): DataStream[T] = {
     val returnType = createTypeInformation[T]
-    asScalaStream(
-      translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+    asScalaStream(translate(
+      table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
   }
 
 /**
@@ -180,7 +182,7 @@ class StreamTableEnvironment(
   * @return The converted [[DataStream]].
   */
   def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
-    toRetractStream(table, qConf)
+    toRetractStream(table, queryConfig)
   }
 
   /**
@@ -191,16 +193,16 @@ class StreamTableEnvironment(
     * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
     *
     * @param table The [[Table]] to convert.
-    * @param qConfig The configuration of the query to generate.
+    * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the requested data type.
     * @return The converted [[DataStream]].
     */
   def toRetractStream[T: TypeInformation](
       table: Table,
-      qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+      queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
     val returnType = createTypeInformation[(Boolean, T)]
     asScalaStream(
-      translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
+      translate(table, queryConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 966b42f..9874a9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -59,12 +59,12 @@ class TableConversions(table: Table) {
 
   /** Converts the [[Table]] to a [[DataStream]] of the specified type.
     *
-    * @param qConfig The configuration for the generated query.
+    * @param queryConfig The configuration for the generated query.
     */
-  def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = {
+  def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table, qConfig)
+        tEnv.toDataStream(table, queryConfig)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +
@@ -97,14 +97,15 @@ class TableConversions(table: Table) {
     *
     * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
     *
-    * @param qConfig The configuration for the generated query.
+    * @param queryConfig The configuration for the generated query.
     *
     */
-  def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+  def toRetractStream[T: TypeInformation](
+      queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
 
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toRetractStream(table, qConfig)
+        tEnv.toRetractStream(table, queryConfig)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 5a2eb1c..ca61c65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -764,13 +764,13 @@ class Table(
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
 
-    def qConfig = this.tableEnv match {
-      case s: StreamTableEnvironment => s.qConf
+    def queryConfig = this.tableEnv match {
+      case s: StreamTableEnvironment => s.queryConfig
       case b: BatchTableEnvironment => new BatchQueryConfig
       case _ => null
     }
 
-    writeToSink(sink, qConfig)
+    writeToSink(sink, queryConfig)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 0e377b5..5f270f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -85,11 +85,12 @@ class DataStreamCalc(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
-    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val inputDataStream =
+      getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
     val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
 
     val generator = new CodeGenerator(config, false, inputRowType)

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index cbd818a..5b32b10 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -84,12 +84,12 @@ class DataStreamCorrelate(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
-    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
     val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index f01b24a..e5d8088 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -106,16 +106,16 @@ class DataStreamGroupAggregate(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) {
+    if (groupings.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
       LOG.warn(
         "No state retention interval configured for a query which accumulates state. " +
         "Please provide a query configuration with valid retention interval to prevent excessive " +
-          "state size. You may specify a retention time of 0 to not clean up the state.")
+        "state size. You may specify a retention time of 0 to not clean up the state.")
     }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](
@@ -149,7 +149,7 @@ class DataStreamGroupAggregate(
       inputSchema.logicalType,
       inputSchema.physicalFieldTypeInfo,
       groupings,
-      qConfig,
+      queryConfig,
       DataStreamRetractionRules.isAccRetract(this),
       DataStreamRetractionRules.isAccRetract(getInput))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index d2aaad0..2a71592 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -109,9 +109,9 @@ class DataStreamGroupWindowAggregate(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 8e97884..a9fbf02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.slf4j.LoggerFactory
 
 class DataStreamOverAggregate(
     logicWindow: Window,
@@ -47,6 +48,7 @@ class DataStreamOverAggregate(
   extends SingleRel(cluster, traitSet, inputNode)
   with OverAggregate
   with DataStreamRel {
+  private val LOG = LoggerFactory.getLogger(this.getClass)
 
   override def deriveRowType(): RelDataType = schema.logicalType
 
@@ -90,7 +92,7 @@ class DataStreamOverAggregate(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     if (logicWindow.groups.size > 1) {
       throw new TableException(
@@ -112,10 +114,23 @@ class DataStreamOverAggregate(
         "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
     }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
     val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
 
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on Over window aggregation is not supported yet. " +
+        "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+    }
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && queryConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+        "Please provide a query configuration with valid retention interval to prevent " +
+        "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+    }
+
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
@@ -126,18 +141,13 @@ class DataStreamOverAggregate(
       .get(orderKey.getFieldIndex)
       .getType
 
-    if (consumeRetraction) {
-      throw new TableException(
-        "Retraction on Over window aggregation is not supported yet. " +
-          "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
-    }
-
     timeType match {
       case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
         // proc-time OVER window
         if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
           // unbounded OVER window
           createUnboundedAndCurrentRowOverWindow(
+            queryConfig,
             generator,
             inputDS,
             isRowTimeType = false,
@@ -145,8 +155,10 @@ class DataStreamOverAggregate(
         } else if (
           overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
             overWindow.upperBound.isCurrentRow) {
+
           // bounded OVER window
           createBoundedAndCurrentRowOverWindow(
+            queryConfig,
             generator,
             inputDS,
             isRowTimeType = false,
@@ -162,6 +174,7 @@ class DataStreamOverAggregate(
           overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
           // unbounded OVER window
           createUnboundedAndCurrentRowOverWindow(
+            queryConfig,
             generator,
             inputDS,
             isRowTimeType = true,
@@ -169,6 +182,7 @@ class DataStreamOverAggregate(
         } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           createBoundedAndCurrentRowOverWindow(
+            queryConfig,
             generator,
             inputDS,
             isRowTimeType = true,
@@ -185,6 +199,7 @@ class DataStreamOverAggregate(
   }
 
   def createUnboundedAndCurrentRowOverWindow(
+    queryConfig: StreamQueryConfig,
     generator: CodeGenerator,
     inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
@@ -210,6 +225,7 @@ class DataStreamOverAggregate(
       inputSchema.physicalType,
       inputSchema.physicalTypeInfo,
       inputSchema.physicalFieldTypeInfo,
+      queryConfig,
       isRowTimeType,
       partitionKeys.nonEmpty,
       isRowsClause)
@@ -242,6 +258,7 @@ class DataStreamOverAggregate(
   }
 
   def createBoundedAndCurrentRowOverWindow(
+    queryConfig: StreamQueryConfig,
     generator: CodeGenerator,
     inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
@@ -269,6 +286,7 @@ class DataStreamOverAggregate(
       inputSchema.physicalTypeInfo,
       inputSchema.physicalFieldTypeInfo,
       precedingOffset,
+      queryConfig,
       isRowsClause,
       isRowTimeType
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 6f6edf7..65d336f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -29,12 +29,12 @@ trait DataStreamRel extends FlinkRelNode {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @param qConfig The configuration for the query to generate.
+    * @param queryConfig The configuration for the query to generate.
     * @return DataStream of type [[CRow]]
     */
   def translateToPlan(
     tableEnv: StreamTableEnvironment,
-    qConfig: StreamQueryConfig): DataStream[CRow]
+    queryConfig: StreamQueryConfig): DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index e64bf0f..424c6a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -56,7 +56,7 @@ class DataStreamScan(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 6cc7396..6f4980a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -60,10 +60,10 @@ class DataStreamUnion(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
-    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
     leftDataSet.union(rightDataSet)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index ba6b025..d7c490f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -58,7 +58,7 @@ class DataStreamValues(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 225f23f..51e609f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -100,7 +100,7 @@ class StreamTableSourceScan(
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      qConfig: StreamQueryConfig): DataStream[CRow] = {
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
     val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 27392c7..8073959 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -77,6 +77,7 @@ object AggregateUtil {
       inputType: RelDataType,
       inputTypeInfo: TypeInformation[Row],
       inputFieldTypeInfo: Seq[TypeInformation[_]],
+      queryConfig: StreamQueryConfig,
       isRowTimeType: Boolean,
       isPartitioned: Boolean,
       isRowsClause: Boolean)
@@ -117,23 +118,27 @@ object AggregateUtil {
         new RowTimeUnboundedRowsOver(
           genFunction,
           aggregationStateType,
-          CRowTypeInfo(inputTypeInfo))
+          CRowTypeInfo(inputTypeInfo),
+          queryConfig)
       } else {
         // RANGE unbounded over process function
         new RowTimeUnboundedRangeOver(
           genFunction,
           aggregationStateType,
-          CRowTypeInfo(inputTypeInfo))
+          CRowTypeInfo(inputTypeInfo),
+          queryConfig)
       }
     } else {
       if (isPartitioned) {
         new ProcTimeUnboundedPartitionedOver(
           genFunction,
-          aggregationStateType)
+          aggregationStateType,
+          queryConfig)
       } else {
         new ProcTimeUnboundedNonPartitionedOver(
           genFunction,
-          aggregationStateType)
+          aggregationStateType,
+          queryConfig)
       }
     }
   }
@@ -155,7 +160,7 @@ object AggregateUtil {
       inputRowType: RelDataType,
       inputFieldTypes: Seq[TypeInformation[_]],
       groupings: Array[Int],
-      qConfig: StreamQueryConfig,
+      queryConfig: StreamQueryConfig,
       generateRetraction: Boolean,
       consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
@@ -192,7 +197,7 @@ object AggregateUtil {
       genFunction,
       aggregationStateType,
       generateRetraction,
-      qConfig)
+      queryConfig)
 
   }
 
@@ -217,6 +222,7 @@ object AggregateUtil {
       inputTypeInfo: TypeInformation[Row],
       inputFieldTypeInfo: Seq[TypeInformation[_]],
       precedingOffset: Long,
+      queryConfig: StreamQueryConfig,
       isRowsClause: Boolean,
       isRowTimeType: Boolean)
     : ProcessFunction[CRow, CRow] = {
@@ -258,15 +264,15 @@ object AggregateUtil {
           genFunction,
           aggregationStateType,
           inputRowType,
-          precedingOffset
-        )
+          precedingOffset,
+          queryConfig)
       } else {
         new RowTimeBoundedRangeOver(
           genFunction,
           aggregationStateType,
           inputRowType,
-          precedingOffset
-        )
+          precedingOffset,
+          queryConfig)
       }
     } else {
       if (isRowsClause) {
@@ -274,13 +280,15 @@ object AggregateUtil {
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputRowType)
+          inputRowType,
+          queryConfig)
       } else {
         new ProcTimeBoundedRangeOver(
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputRowType)
+          inputRowType,
+          queryConfig)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 84fee87..57ea86e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -41,19 +41,13 @@ class GroupAggProcessFunction(
     private val genAggregations: GeneratedAggregationsFunction,
     private val aggregationStateType: RowTypeInfo,
     private val generateRetraction: Boolean,
-    private val qConfig: StreamQueryConfig)
-  extends ProcessFunction[CRow, CRow]
+    private val queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
-  private val minRetentionTime = qConfig.getMinIdleStateRetentionTime
-  private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
-  private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
-  // interval in which clean-up timers are registered
-  private val cleanupTimerInterval = maxRetentionTime - minRetentionTime
-
   private var newRow: CRow = _
   private var prevRow: CRow = _
   private var firstRow: Boolean = _
@@ -61,8 +55,6 @@ class GroupAggProcessFunction(
   private var state: ValueState[Row] = _
   // counts the number of added and retracted input records
   private var cntState: ValueState[JLong] = _
-  // holds the latest registered cleanup timer
-  private var cleanupTimeState: ValueState[JLong] = _
 
   override def open(config: Configuration) {
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -84,11 +76,7 @@ class GroupAggProcessFunction(
       new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
     cntState = getRuntimeContext.getState(inputCntDescriptor)
 
-    if (stateCleaningEnabled) {
-      val inputCntDescriptor: ValueStateDescriptor[JLong] =
-        new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG)
-      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
-    }
+    initCleanupTimeState("GroupAggregateCleanupTime")
   }
 
   override def processElement(
@@ -96,22 +84,9 @@ class GroupAggProcessFunction(
       ctx: ProcessFunction[CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
-    if (stateCleaningEnabled) {
-
-      val currentTime = ctx.timerService().currentProcessingTime()
-      val earliestCleanup = currentTime + minRetentionTime
-
-      // last registered timer
-      val lastCleanupTime = cleanupTimeState.value()
-
-      if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
-        // we need to register a new timer
-        val cleanupTime = earliestCleanup + cleanupTimerInterval
-        // register timer and remember clean-up time
-        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
-        cleanupTimeState.update(cleanupTime)
-      }
-    }
+    val currentTime = ctx.timerService().currentProcessingTime()
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, currentTime)
 
     val input = inputC.row
 
@@ -182,11 +157,8 @@ class GroupAggProcessFunction(
       ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
       out: Collector[CRow]): Unit = {
 
-    if (timestamp == cleanupTimeState.value()) {
-      // clear all state
-      this.state.clear()
-      this.cntState.clear()
-      this.cleanupTimeState.clear()
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state, cntState)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 3fb506f..d50912c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{ArrayList, List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
@@ -48,9 +49,11 @@ class ProcTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingTimeBoundary: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[CRow])
-  extends ProcessFunction[CRow, CRow]
+    inputType: TypeInformation[CRow],
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
+
   private var output: CRow = _
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
@@ -81,6 +84,8 @@ class ProcTimeBoundedRangeOver(
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("overState", aggregatesTypeInfo)
     accumulatorState = getRuntimeContext.getState(stateDescriptor)
+
+    initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime")
   }
 
   override def processElement(
@@ -89,6 +94,9 @@ class ProcTimeBoundedRangeOver(
     out: Collector[CRow]): Unit = {
 
     val currentTime = ctx.timerService.currentProcessingTime
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, currentTime)
+
     // buffer the event incoming event
 
     // add current element to the window list of elements with corresponding timestamp
@@ -109,7 +117,15 @@ class ProcTimeBoundedRangeOver(
     ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
-    // we consider the original timestamp of events that have registered this time trigger 1 ms ago
+    if (needToCleanupState(timestamp)) {
+      // clean up and return
+      cleanupState(rowMapState, accumulatorState)
+      return
+    }
+
+    // we consider the original timestamp of events
+    // that have registered this time trigger 1 ms ago
+
     val currentTime = timestamp - 1
     var i = 0
 
@@ -153,7 +169,8 @@ class ProcTimeBoundedRangeOver(
 
     // get the list of elements of current proctime
     val currentElements = rowMapState.get(currentTime)
-    // add current elements to aggregator. Multiple elements might have arrived in the same proctime
+    // add current elements to aggregator. Multiple elements might
+    // have arrived in the same proctime
     // the same accumulator value will be computed for all elements
     var iElemenets = 0
     while (iElemenets < currentElements.size()) {
@@ -178,7 +195,6 @@ class ProcTimeBoundedRangeOver(
 
     // update the value of accumulators for future incremental computation
     accumulatorState.update(accumulators)
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index 0c7f44e..e388c93 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
@@ -49,8 +50,9 @@ class ProcTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingOffset: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[CRow])
-  extends ProcessFunction[CRow, CRow]
+    inputType: TypeInformation[CRow],
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkArgument(precedingOffset > 0)
@@ -99,6 +101,8 @@ class ProcTimeBoundedRowsOver(
     val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
        new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
     smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
+
+    initCleanupTimeState("ProcTimeBoundedRowsOverCleanupTime")
   }
 
   override def processElement(
@@ -110,6 +114,9 @@ class ProcTimeBoundedRowsOver(
 
     val currentTime = ctx.timerService.currentProcessingTime
 
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, currentTime)
+
     // initialize state for the processed element
     var accumulators = accumulatorState.value
     if (accumulators == null) {
@@ -180,4 +187,13 @@ class ProcTimeBoundedRowsOver(
     out.collect(output)
   }
 
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(rowMapState, accumulatorState, counterState, smallestTsState)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 8a23132..2a6c9c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.util.Collector
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -37,8 +38,9 @@ import org.slf4j.LoggerFactory
   */
 class ProcTimeUnboundedNonPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[CRow, CRow]
+    aggregationStateType: RowTypeInfo,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with CheckpointedFunction
     with Compiler[GeneratedAggregations] {
 
@@ -68,12 +70,16 @@ class ProcTimeUnboundedNonPartitionedOver(
         accumulators = function.createAccumulators()
       }
     }
+    initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
   }
 
   override def processElement(
       inputC: CRow,
       ctx: ProcessFunction[CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
 
     val input = inputC.row
 
@@ -85,6 +91,16 @@ class ProcTimeUnboundedNonPartitionedOver(
     out.collect(output)
   }
 
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
+
   override def snapshotState(context: FunctionSnapshotContext): Unit = {
     state.clear()
     if (null != accumulators) {

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
index 847c1bf..97f0ad7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -24,6 +24,7 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.CRow
 import org.slf4j.LoggerFactory
@@ -36,8 +37,9 @@ import org.slf4j.LoggerFactory
   */
 class ProcTimeUnboundedPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[CRow, CRow]
+    aggregationStateType: RowTypeInfo,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
   private var output: CRow = _
@@ -59,6 +61,8 @@ class ProcTimeUnboundedPartitionedOver(
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("overState", aggregationStateType)
     state = getRuntimeContext.getState(stateDescriptor)
+
+    initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
   }
 
   override def processElement(
@@ -66,6 +70,9 @@ class ProcTimeUnboundedPartitionedOver(
     ctx: ProcessFunction[CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
     val input = inputC.row
 
     var accumulators = state.value()
@@ -83,4 +90,13 @@ class ProcTimeUnboundedPartitionedOver(
     out.collect(output)
   }
 
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
new file mode 100644
index 0000000..292fd3b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryConfig)
+  extends ProcessFunction[IN, OUT]{
+
+  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+    if (stateCleaningEnabled) {
+      val inputCntDescriptor: ValueStateDescriptor[JLong] =
+        new ValueStateDescriptor[JLong](stateName, Types.LONG)
+      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+    }
+  }
+
+  protected def registerProcessingCleanupTimer(
+    ctx: ProcessFunction[IN, OUT]#Context,
+    currentTime: Long): Unit = {
+    if (stateCleaningEnabled) {
+
+      // last registered timer
+      val curCleanupTime = cleanupTimeState.value()
+
+      // check if a cleanup timer is registered and
+      // that the current cleanup timer won't delete state we need to keep
+      if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+        // we need to register a new (later) timer
+        val cleanupTime = currentTime + maxRetentionTime
+        // register timer and remember clean-up time
+        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+        cleanupTimeState.update(cleanupTime)
+      }
+    }
+  }
+
+  protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
+    ctx.timeDomain() == TimeDomain.PROCESSING_TIME
+  }
+
+  protected def needToCleanupState(timestamp: Long): Boolean = {
+    if (stateCleaningEnabled) {
+      val cleanupTime = cleanupTimeState.value()
+      // check that the triggered timer is the last registered processing time timer.
+      null != cleanupTime && timestamp == cleanupTime
+    } else {
+      false
+    }
+  }
+
+  protected def cleanupState(states: State*): Unit = {
+    // clear all state
+    states.foreach(_.clear())
+    this.cleanupTimeState.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 4020d44..65edf6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
@@ -42,8 +43,9 @@ class RowTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
-    precedingOffset: Long)
-  extends ProcessFunction[CRow, CRow]
+    precedingOffset: Long,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
@@ -97,6 +99,8 @@ class RowTimeBoundedRangeOver(
         valueTypeInformation)
 
     dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime")
   }
 
   override def processElement(
@@ -106,6 +110,9 @@ class RowTimeBoundedRangeOver(
 
     val input = inputC.row
 
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
 
@@ -131,6 +138,34 @@ class RowTimeBoundedRangeOver(
     timestamp: Long,
     ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
+
+    if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+      if (needToCleanupState(timestamp)) {
+
+        val keysIt = dataState.keys.iterator()
+        val lastProcessedTime = lastTriggeringTsState.value
+
+        // is data left which has not been processed yet?
+        var noRecordsToProcess = true
+        while (keysIt.hasNext && noRecordsToProcess) {
+          if (keysIt.next() > lastProcessedTime) {
+            noRecordsToProcess = false
+          }
+        }
+
+        if (noRecordsToProcess) {
+          // we clean the state
+          cleanupState(dataState, accumulatorState, lastTriggeringTsState)
+        } else {
+          // There are records left to process because a watermark has not been received yet.
+          // This would only happen if the input stream has stopped. So we don't need to clean up.
+          // We leave the state as it is and schedule a new cleanup timer
+          registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+        }
+      }
+      return
+    }
+
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 
@@ -196,8 +231,11 @@ class RowTimeBoundedRangeOver(
 
       // update state
       accumulatorState.update(accumulators)
-      lastTriggeringTsState.update(timestamp)
     }
+    lastTriggeringTsState.update(timestamp)
+
+    // update cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 5ec6ec7..395ae39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -43,8 +44,9 @@ class RowTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
-    precedingOffset: Long)
-  extends ProcessFunction[CRow, CRow]
+    precedingOffset: Long,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkNotNull(aggregationStateType)
@@ -106,6 +108,8 @@ class RowTimeBoundedRowsOver(
         valueTypeInformation)
 
     dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    initCleanupTimeState("RowTimeBoundedRowsOverCleanupTime")
   }
 
   override def processElement(
@@ -115,6 +119,9 @@ class RowTimeBoundedRowsOver(
 
     val input = inputC.row
 
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
 
@@ -141,6 +148,33 @@ class RowTimeBoundedRowsOver(
     ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
+    if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+      if (needToCleanupState(timestamp)) {
+
+        val keysIt = dataState.keys.iterator()
+        val lastProcessedTime = lastTriggeringTsState.value
+
+        // is data left which has not been processed yet?
+        var noRecordsToProcess = true
+        while (keysIt.hasNext && noRecordsToProcess) {
+          if (keysIt.next() > lastProcessedTime) {
+            noRecordsToProcess = false
+          }
+        }
+
+        if (noRecordsToProcess) {
+          // We clean the state
+          cleanupState(dataState, accumulatorState, dataCountState, lastTriggeringTsState)
+        } else {
+          // There are records left to process because a watermark has not been received yet.
+          // This would only happen if the input stream has stopped. So we don't need to clean up.
+          // We leave the state as it is and schedule a new cleanup timer
+          registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+        }
+      }
+      return
+    }
+
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 
@@ -220,6 +254,9 @@ class RowTimeBoundedRowsOver(
     }
 
     lastTriggeringTsState.update(timestamp)
+
+    // update cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 3e2a811..741d2b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -28,6 +28,7 @@ import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
@@ -43,8 +44,9 @@ import org.slf4j.LoggerFactory
 abstract class RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[CRow])
-  extends ProcessFunction[CRow, CRow]
+    inputType: TypeInformation[CRow],
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
 
   protected var output: CRow = _
@@ -83,6 +85,8 @@ abstract class RowTimeUnboundedOver(
       new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
     rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    initCleanupTimeState("RowTimeUnboundedOverCleanupTime")
   }
 
   /**
@@ -101,6 +105,9 @@ abstract class RowTimeUnboundedOver(
 
     val input = inputC.row
 
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
     val timestamp = ctx.timestamp()
     val curWatermark = ctx.timerService().currentWatermark()
 
@@ -133,6 +140,24 @@ abstract class RowTimeUnboundedOver(
       ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
       out: Collector[CRow]): Unit = {
 
+    if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
+      if (needToCleanupState(timestamp)) {
+
+        // we check whether there are still records which have not been processed yet
+        val noRecordsToProcess = !rowMapState.keys.iterator().hasNext
+        if (noRecordsToProcess) {
+          // we clean the state
+          cleanupState(rowMapState, accumulatorState)
+        } else {
+          // There are records left to process because a watermark has not been received yet.
+          // This would only happen if the input stream has stopped. So we don't need to clean up.
+          // We leave the state as it is and schedule a new cleanup timer
+          registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+        }
+      }
+      return
+    }
+
     Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
     val collector = out.asInstanceOf[TimestampedCollector[CRow]]
 
@@ -178,6 +203,9 @@ abstract class RowTimeUnboundedOver(
         ctx.timerService.registerEventTimeTimer(curWatermark + 1)
       }
     }
+
+    // update cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
   }
 
   /**
@@ -221,11 +249,13 @@ abstract class RowTimeUnboundedOver(
 class RowTimeUnboundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[CRow])
+    inputType: TypeInformation[CRow],
+    queryConfig: StreamQueryConfig)
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
-    inputType) {
+    inputType,
+    queryConfig) {
 
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],
@@ -259,11 +289,13 @@ class RowTimeUnboundedRowsOver(
 class RowTimeUnboundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[CRow])
+    inputType: TypeInformation[CRow],
+    queryConfig: StreamQueryConfig)
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
-    inputType) {
+    inputType,
+    queryConfig) {
 
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],