You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:42:51 UTC

[3/5] flink git commit: [FLINK-6491] [table] Add QueryConfig and state clean up for non-windowed aggregates.

[FLINK-6491] [table] Add QueryConfig and state clean up for non-windowed aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/003f81a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/003f81a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/003f81a7

Branch: refs/heads/release-1.3
Commit: 003f81a73fe38edcd17f10f8e6afb50ba23e3c28
Parents: f3ce088
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon May 8 18:41:37 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 08:33:44 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  17 ++-
 .../apache/flink/table/api/QueryConfig.scala    | 102 ++++++++++++++++
 .../table/api/StreamTableEnvironment.scala      |  51 ++++++--
 .../flink/table/api/TableEnvironment.scala      |   2 +-
 .../table/api/java/StreamTableEnvironment.scala | 115 +++++++++++++++++--
 .../api/scala/StreamTableEnvironment.scala      |  46 +++++++-
 .../table/api/scala/TableConversions.scala      |  40 ++++++-
 .../org/apache/flink/table/api/table.scala      |  26 ++++-
 .../plan/nodes/datastream/DataStreamCalc.scala  |   8 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |   8 +-
 .../datastream/DataStreamGroupAggregate.scala   |  20 +++-
 .../DataStreamGroupWindowAggregate.scala        |   8 +-
 .../datastream/DataStreamOverAggregate.scala    |   9 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |   7 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   7 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |  10 +-
 .../nodes/datastream/DataStreamValues.scala     |   6 +-
 .../datastream/StreamTableSourceScan.scala      |   7 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   6 +-
 .../aggregate/GroupAggProcessFunction.scala     |  54 ++++++++-
 .../table/utils/MockTableEnvironment.scala      |   9 +-
 21 files changed, 494 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 2a3cedf..f33c187 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
+import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -113,9 +113,20 @@ abstract class BatchTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param qConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
     */
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = {
+
+    // We do not pass the configuration on, because there is nothing to configure for batch queries.
+    val bQConfig = qConfig match {
+      case batchConfig: BatchQueryConfig => batchConfig
+      case _ =>
+        throw new TableException("BatchQueryConfig required to configure batch query.")
+    }
 
     sink match {
       case batchSink: BatchTableSink[T] =>
@@ -125,7 +136,7 @@ abstract class BatchTableEnvironment(
         // Give the DataSet to the TableSink to emit it.
         batchSink.emitDataSet(result)
       case _ =>
-        throw new TableException("BatchTableSink required to emit batch Table")
+        throw new TableException("BatchTableSink required to emit batch Table.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
new file mode 100644
index 0000000..8e8b5ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.io.Serializable
+import org.apache.flink.api.common.time.Time
+
+class QueryConfig private[table] extends Serializable {}
+
+/**
+  * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
+  */
+class BatchQueryConfig private[table] extends QueryConfig
+
+/**
+  * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
+  *
+  * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]]
+  * method.
+  */
+class StreamQueryConfig private[table] extends QueryConfig {
+
+  /**
+    * The minimum time until state which was not updated will be retained.
+    * State might be cleared and removed if it was not updated for the defined period of time.
+    */
+  private var minIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * The maximum time until state which was not updated will be retained.
+    * State will be cleared and removed if it was not updated for the defined period of time.
+    */
+  private var maxIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * Specifies the time interval for how long idle state, i.e., state which was not updated, will
+    * be retained. When state was not updated for the specified interval of time, it will be cleared
+    * and removed.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+    * was the first data. This can result in previous results being overwritten.
+    *
+    * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
+    * maximum time for state to be retained. This method is more efficient, because the system has
+    * to do less bookkeeping to identify the time at which state must be cleared.
+    *
+    * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
+    *             clean-up the state.
+    */
+  def setIdleStateRetentionTime(time: Time): StreamQueryConfig = {
+    setIdleStateRetentionTime(time, time)
+  }
+
+  /**
+    * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+    * was not updated, will be retained.
+    * State will never be cleared until it was idle for less than the minimum time and will never
+    * be kept if it was idle for more than the maximum time.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+    * was the first data. This can result in previous results being overwritten.
+    *
+    * Set to 0 (zero) to never clean-up the state.
+    *
+    * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+    *                never clean-up the state.
+    * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
+    *                than than minTime. Set to 0 (zero) to never clean-up the state.
+    */
+  def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
+    if (maxTime.toMilliseconds < minTime.toMilliseconds) {
+      throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+    }
+    minIdleStateRetentionTime = minTime.toMilliseconds
+    maxIdleStateRetentionTime = maxTime.toMilliseconds
+    this
+  }
+
+  def getMinIdleStateRetentionTime: Long = {
+    minIdleStateRetentionTime
+  }
+
+  def getMaxIdleStateRetentionTime: Long = {
+    maxIdleStateRetentionTime
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index aef2b1b..c594d4c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
+  def qConf: StreamQueryConfig = new StreamQueryConfig
+
   /**
     * Checks if the chosen table name is valid.
     *
@@ -126,9 +128,20 @@ abstract class StreamTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param qConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
     */
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = {
+
+    // Check query configuration
+    val sQConf = qConfig match {
+      case streamConfig: StreamQueryConfig => streamConfig
+      case _ =>
+        throw new TableException("StreamQueryConfig required to configure stream query.")
+    }
 
     sink match {
 
@@ -137,7 +150,7 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the TableSink expects.
         val result: DataStream[T] =
-          translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+          translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         retractSink.asInstanceOf[RetractStreamTableSink[Any]]
           .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -160,7 +173,11 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the TableSink expects.
         val result: DataStream[T] =
-          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType)
+          translate(
+            optimizedPlan,
+            table.getRelNode.getRowType,
+            sQConf,
+            withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
           .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -176,7 +193,11 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the TableSink expects.
         val result: DataStream[T] =
-          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType)
+          translate(
+            optimizedPlan,
+            table.getRelNode.getRowType,
+            sQConf,
+            withChangeFlag = false)(outputType)
         // Give the DataStream to the TableSink to emit it.
         appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
 
@@ -545,17 +566,21 @@ abstract class StreamTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
     *
     * @param table The root node of the relational expression tree.
+    * @param qConfig The configuration for the query to generate.
     * @param updatesAsRetraction Set to true to encode updates as retraction messages.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean)
-      (implicit tpe: TypeInformation[A]): DataStream[A] = {
+  protected def translate[A](
+      table: Table,
+      qConfig: StreamQueryConfig,
+      updatesAsRetraction: Boolean,
+      withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
-    translate(dataStreamPlan, relNode.getRowType, withChangeFlag)
+    translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag)
   }
 
   /**
@@ -564,6 +589,7 @@ abstract class StreamTableEnvironment(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can lose the
     *                    field naming during optimization we pass the row type separately.
+    * @param qConfig     The configuration for the query to generate.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
@@ -572,6 +598,7 @@ abstract class StreamTableEnvironment(
   protected def translate[A](
       logicalPlan: RelNode,
       logicalType: RelDataType,
+      qConfig: StreamQueryConfig,
       withChangeFlag: Boolean)
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
@@ -583,7 +610,7 @@ abstract class StreamTableEnvironment(
     }
 
     // get CRow plan
-    val plan: DataStream[CRow] = translateToCRow(logicalPlan)
+    val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig)
 
     // convert CRow to output type
     val conversion = if (withChangeFlag) {
@@ -615,14 +642,16 @@ abstract class StreamTableEnvironment(
     * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
     *
     * @param logicalPlan The logical plan to translate.
+    * @param qConfig The configuration for the query to generate.
     * @return The [[DataStream]] of type [[CRow]].
     */
   protected def translateToCRow(
-    logicalPlan: RelNode): DataStream[CRow] = {
+    logicalPlan: RelNode,
+    qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     logicalPlan match {
       case node: DataStreamRel =>
-        node.translateToPlan(this)
+        node.translateToPlan(this, qConfig)
       case _ =>
         throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
           "This is a bug and should not happen. Please file an issue.")
@@ -638,7 +667,7 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast, updatesAsRetraction = false)
-    val dataStream = translateToCRow(optimizedPlan)
+    val dataStream = translateToCRow(optimizedPlan, qConf)
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bf4a8e0..9f50f0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -510,7 +510,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param sink The [[TableSink]] to write the [[Table]] to.
     * @tparam T The data type that the [[TableSink]] expects.
     */
-  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
+  private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
 
   /**
     * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index a70bcca..c3b5951 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -150,9 +150,50 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    toDataStream(table, clazz, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+    toDataStream(table, typeInfo, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = {
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
   /**
@@ -168,12 +209,64 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+  def toDataStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      qConfig: StreamQueryConfig): DataStream[T] = {
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](
+      table: Table,
+      clazz: Class[T]): DataStream[JTuple2[JBool, T]] = {
+
+    toRetractStream(table, clazz, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = {
+
+    toRetractStream(table, typeInfo, qConf)
   }
 
   /**
@@ -190,17 +283,21 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the requested record type.
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
-  def toRetractStream[T](table: Table, clazz: Class[T]):
-    DataStream[JTuple2[JBool, T]] = {
+  def toRetractStream[T](
+      table: Table,
+      clazz: Class[T],
+      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
     val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
     translate[JTuple2[JBool, T]](
       table,
+      qConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultType)
   }
@@ -219,11 +316,14 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] of the requested record type.
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
-  def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]):
-    DataStream[JTuple2[JBool, T]] = {
+  def toRetractStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     TableEnvironment.validateType(typeInfo)
     val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
@@ -232,6 +332,7 @@ class StreamTableEnvironment(
     )
     translate[JTuple2[JBool, T]](
       table,
+      qConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultTypeInfo)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index e5ad6c2..56f7d55 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -143,8 +143,29 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+    toDataStream(table, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = {
     val returnType = createTypeInformation[T]
-    asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+    asScalaStream(
+      translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
   }
 
 /**
@@ -159,8 +180,27 @@ class StreamTableEnvironment(
   * @return The converted [[DataStream]].
   */
   def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
+    toRetractStream(table, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * @param table The [[Table]] to convert.
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the requested data type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T: TypeInformation](
+      table: Table,
+      qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
     val returnType = createTypeInformation[(Boolean, T)]
-    asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType))
+    asScalaStream(
+      translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 5efff62..966b42f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.{Table, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
 
@@ -57,6 +57,21 @@ class TableConversions(table: Table) {
     }
   }
 
+  /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+    *
+    * @param qConfig The configuration for the generated query.
+    */
+  def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = {
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toDataStream(table, qConfig)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
   /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
     * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
     * the second field holds the record of the specified type [[T]].
@@ -76,5 +91,28 @@ class TableConversions(table: Table) {
     }
   }
 
+  /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * @param qConfig The configuration for the generated query.
+    *
+    */
+  def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toRetractStream(table, qConfig)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 310a75f..5a2eb1c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -763,6 +763,30 @@ class Table(
     * @tparam T The data type that the [[TableSink]] expects.
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
+
+    def qConfig = this.tableEnv match {
+      case s: StreamTableEnvironment => s.qConf
+      case b: BatchTableEnvironment => new BatchQueryConfig
+      case _ => null
+    }
+
+    writeToSink(sink, qConfig)
+  }
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param sink The [[TableSink]] to which the [[Table]] is written.
+    * @param conf The configuration for the query that writes to the sink.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = {
     // get schema information of table
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
@@ -773,7 +797,7 @@ class Table(
     val configuredSink = sink.configure(fieldNames, fieldTypes)
 
     // emit the table to the configured table sink
-    tableEnv.writeToSink(this, configuredSink)
+    tableEnv.writeToSink(this, configuredSink, conf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index ce0f966..0e377b5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
@@ -83,11 +83,13 @@ class DataStreamCalc(
     estimateRowCount(calcProgram, rowCnt)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
-    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
 
     val generator = new CodeGenerator(config, false, inputRowType)

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 19ad89b..cbd818a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
@@ -82,12 +82,14 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
-    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 506c0cb..f01b24a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -21,9 +21,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -31,6 +32,7 @@ import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.slf4j.LoggerFactory
 
 /**
   *
@@ -59,6 +61,8 @@ class DataStreamGroupAggregate(
     with CommonAggregate
     with DataStreamRel {
 
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
   override def deriveRowType() = schema.logicalType
 
   override def needsUpdatesAsRetraction = true
@@ -100,9 +104,18 @@ class DataStreamGroupAggregate(
         inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+        "Please provide a query configuration with valid retention interval to prevent excessive " +
+          "state size. You may specify a retention time of 0 to not clean up the state.")
+    }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](
@@ -136,6 +149,7 @@ class DataStreamGroupAggregate(
       inputSchema.logicalType,
       inputSchema.physicalFieldTypeInfo,
       groupings,
+      qConfig,
       DataStreamRetractionRules.isAccRetract(this),
       DataStreamRetractionRules.isAccRetract(getInput))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index ef207b0..d2aaad0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
@@ -107,9 +107,11 @@ class DataStreamGroupWindowAggregate(
           namedProperties))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 4061242..8e97884 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.plan.schema.RowSchema
@@ -88,7 +88,10 @@ class DataStreamOverAggregate(
           namedAggregates))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     if (logicWindow.groups.size > 1) {
       throw new TableException(
         "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
@@ -109,7 +112,7 @@ class DataStreamOverAggregate(
         "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
     }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
 
     val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 9754de4..6f6edf7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.nodes.FlinkRelNode
 import org.apache.flink.table.runtime.types.CRow
 
@@ -29,9 +29,12 @@ trait DataStreamRel extends FlinkRelNode {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
+    * @param qConfig The configuration for the query to generate.
     * @return DataStream of type [[CRow]]
     */
-  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
+  def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    qConfig: StreamQueryConfig): DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c613646..e64bf0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.schema.DataStreamTable
 import org.apache.flink.table.runtime.types.CRow
@@ -54,7 +54,10 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     convertToInternalRow(schema, inputDataStream, dataStreamTable, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 654c259..6cc7396 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 
@@ -58,10 +58,12 @@ class DataStreamUnion(
     s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     leftDataSet.union(rightDataSet)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 32c9aaf..ba6b025 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.io.CRowValuesInputFormat
@@ -56,7 +56,9 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index b2d7019..225f23f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
@@ -98,7 +98,10 @@ class StreamTableSourceScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     val config = tableEnv.getConfig
     val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
     convertToInternalRow(

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 768c9cb..27392c7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
@@ -155,6 +155,7 @@ object AggregateUtil {
       inputRowType: RelDataType,
       inputFieldTypes: Seq[TypeInformation[_]],
       groupings: Array[Int],
+      qConfig: StreamQueryConfig,
       generateRetraction: Boolean,
       consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
@@ -190,7 +191,8 @@ object AggregateUtil {
     new GroupAggProcessFunction(
       genFunction,
       aggregationStateType,
-      generateRetraction)
+      generateRetraction,
+      qConfig)
 
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 6ee37e6..84fee87 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,9 +26,9 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.table.runtime.types.CRow
 
 /**
@@ -40,13 +40,20 @@ import org.apache.flink.table.runtime.types.CRow
 class GroupAggProcessFunction(
     private val genAggregations: GeneratedAggregationsFunction,
     private val aggregationStateType: RowTypeInfo,
-    private val generateRetraction: Boolean)
+    private val generateRetraction: Boolean,
+    private val qConfig: StreamQueryConfig)
   extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
+  private val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
+  // interval in which clean-up timers are registered
+  private val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
   private var newRow: CRow = _
   private var prevRow: CRow = _
   private var firstRow: Boolean = _
@@ -54,6 +61,8 @@ class GroupAggProcessFunction(
   private var state: ValueState[Row] = _
   // counts the number of added and retracted input records
   private var cntState: ValueState[JLong] = _
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
 
   override def open(config: Configuration) {
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -74,6 +83,12 @@ class GroupAggProcessFunction(
     val inputCntDescriptor: ValueStateDescriptor[JLong] =
       new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
     cntState = getRuntimeContext.getState(inputCntDescriptor)
+
+    if (stateCleaningEnabled) {
+      val inputCntDescriptor: ValueStateDescriptor[JLong] =
+        new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG)
+      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+    }
   }
 
   override def processElement(
@@ -81,6 +96,23 @@ class GroupAggProcessFunction(
       ctx: ProcessFunction[CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
+    if (stateCleaningEnabled) {
+
+      val currentTime = ctx.timerService().currentProcessingTime()
+      val earliestCleanup = currentTime + minRetentionTime
+
+      // last registered timer
+      val lastCleanupTime = cleanupTimeState.value()
+
+      if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
+        // we need to register a new timer
+        val cleanupTime = earliestCleanup + cleanupTimerInterval
+        // register timer and remember clean-up time
+        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+        cleanupTimeState.update(cleanupTime)
+      }
+    }
+
     val input = inputC.row
 
     // get accumulators and input counter
@@ -144,4 +176,18 @@ class GroupAggProcessFunction(
       cntState.clear()
     }
   }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    if (timestamp == cleanupTimeState.value()) {
+      // clear all state
+      this.state.clear()
+      this.cntState.clear()
+      this.cleanupTimeState.clear()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8626b07..3d79e22 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -18,16 +18,17 @@
 
 package org.apache.flink.table.utils
 
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.tools.RuleSet
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 
 class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = ???
 
   override protected def checkValidTableName(name: String): Unit = ???