You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 13:34:35 UTC

[flink] 01/03: [FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 177e3a69a701ff2dc217b039e06f008af7d75d43
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jun 17 11:44:08 2019 +0200

    [FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment
---
 flink-python/pyflink/table/table_environment.py    | 47 +++++++++++
 .../table/tests/test_environment_completeness.py   |  3 +-
 .../table/client/gateway/local/LocalExecutor.java  |  8 +-
 .../table/api/java/BatchTableEnvironment.java      | 41 ++++++++++
 .../table/api/java/StreamTableEnvironment.java     | 61 ++++++++++++++
 .../java/internal/StreamTableEnvironmentImpl.java  | 38 ++++++---
 .../apache/flink/table/api/BatchQueryConfig.java   |  3 +
 .../org/apache/flink/table/api/QueryConfig.java    |  3 +
 .../apache/flink/table/api/StreamQueryConfig.java  | 13 +++
 .../org/apache/flink/table/api/TableConfig.java    | 58 +++++++++++++
 .../apache/flink/table/api/TableEnvironment.java   | 61 +++++---------
 .../table/api/internal/QueryConfigProvider.java    | 41 ----------
 .../table/api/internal/TableEnvironmentImpl.java   | 95 ++++++++++++++--------
 .../apache/flink/table/api/internal/TableImpl.java | 11 ++-
 .../table/api/scala/BatchTableEnvironment.scala    | 62 ++++++++++++++
 .../table/api/scala/StreamTableEnvironment.scala   | 63 ++++++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      | 31 ++++++-
 .../table/api/internal/BatchTableEnvImpl.scala     | 37 +++------
 .../flink/table/api/internal/TableEnvImpl.scala    | 30 ++-----
 .../java/internal/BatchTableEnvironmentImpl.scala  | 18 +++-
 .../scala/internal/BatchTableEnvironmentImpl.scala | 12 ++-
 .../apache/flink/table/planner/StreamPlanner.scala | 25 +++---
 .../sql/validation/InsertIntoValidationTest.scala  |  2 +-
 .../harness/GroupAggregateHarnessTest.scala        | 18 ++--
 .../harness/TableAggregateHarnessTest.scala        | 28 ++++---
 .../flink/table/utils/MockTableEnvironment.scala   |  9 +-
 26 files changed, 593 insertions(+), 225 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 19b5199..3c8b44c 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -446,6 +446,25 @@ class TableEnvironment(object):
         """
         pass
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        <p>The program execution will be logged and displayed with the provided name
+
+        <p><b>NOTE:</b>It is highly advised to set all parameters in the :class:`TableConfig`
+        on the very beginning of the program. It is undefined what configurations values will
+        be used for the execution if queries are mixed with config changes. It depends on
+        the characteristic of the particular parameter. For some of them the value from the
+        point in time of query construction (e.g. the currentCatalog) will be used. On the
+        other hand some values might be evaluated according to the state from the time when
+        this method is called (e.g. timeZone).
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     def from_elements(self, elements, schema=None, verify_schema=True):
         """
         Creates a table from a collection of elements.
@@ -623,6 +642,20 @@ class StreamTableEnvironment(TableEnvironment):
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        The program execution will be logged and displayed with the provided name
+
+        It calls the StreamExecutionEnvironment#execute on the underlying
+        :class:`StreamExecutionEnvironment`. This environment translates queries eagerly.
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     @staticmethod
     def create(stream_execution_environment, table_config=None):
         """
@@ -710,6 +743,20 @@ class BatchTableEnvironment(TableEnvironment):
         return BatchTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        The program execution will be logged and displayed with the provided name
+
+        It calls the ExecutionEnvironment#execute on the underlying
+        :class:`ExecutionEnvironment`. This environment translates queries eagerly.
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     @staticmethod
     def create(execution_environment, table_config=None):
         """
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index a8fc991..1f82445 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,9 +41,10 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
         # listTables should be supported when catalog supported in python.
         # getCompletionHints has been deprecated. It will be removed in the next release.
+        # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
         return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
                 'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
-                'getCompletionHints'}
+                'getCompletionHints', 'create'}
 
 
 if __name__ == '__main__':
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5eabbfe..af128ef 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -35,10 +35,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
@@ -509,7 +511,11 @@ public class LocalExecutor implements Executor {
 		// parse and validate statement
 		try {
 			context.wrapClassLoader(() -> {
-				tableEnv.sqlUpdate(updateStatement, queryConfig);
+				if (tableEnv instanceof StreamTableEnvironment) {
+					((StreamTableEnvironment) tableEnv).sqlUpdate(updateStatement, (StreamQueryConfig) queryConfig);
+				} else {
+					tableEnv.sqlUpdate(updateStatement);
+				}
 				return null;
 			});
 		} catch (Throwable t) {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 16f63af..7b7692d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.descriptors.BatchTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
 
 import java.lang.reflect.Constructor;
 
@@ -211,6 +212,46 @@ public interface BatchTableEnvironment extends TableEnvironment {
 	<T> DataSet<T> toDataSet(Table table, TypeInformation<T> typeInfo, BatchQueryConfig queryConfig);
 
 	/**
+	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+	 * NOTE: Currently only SQL INSERT statements are supported.
+	 *
+	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
+	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+	 * called, for example when it is embedded into a String.
+	 * Hence, SQL queries can directly reference a {@link Table} as follows:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // register the configured table sink into which the result is inserted.
+	 *   tEnv.registerTableSink("sinkTable", configuredSink);
+	 *   Table sourceTable = ...
+	 *   String tableName = sourceTable.toString();
+	 *   // sourceTable is not registered to the table environment
+	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+	 * }
+	 * </pre>
+	 *
+	 * @param stmt The SQL statement to evaluate.
+	 * @param config The {@link BatchQueryConfig} to use.
+	 */
+	void sqlUpdate(String stmt, BatchQueryConfig config);
+
+	/**
+	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+	 *
+	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+	 *
+	 * @param table The Table to write to the sink.
+	 * @param queryConfig The {@link BatchQueryConfig} to use.
+	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
+	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+	 *        {@link Table} is written.
+	 */
+	void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+	/**
 	 * Creates a table source and/or table sink from a descriptor.
 	 *
 	 * <p>Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index 6859780..a18f3d4 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.api.java;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -34,6 +36,7 @@ import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
 
 /**
  * This table environment is the entry point and central context for creating Table & SQL
@@ -448,4 +451,62 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 */
 	@Override
 	StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+
+	/**
+	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+	 * NOTE: Currently only SQL INSERT statements are supported.
+	 *
+	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
+	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+	 * called, for example when it is embedded into a String.
+	 * Hence, SQL queries can directly reference a {@link Table} as follows:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // register the configured table sink into which the result is inserted.
+	 *   tEnv.registerTableSink("sinkTable", configuredSink);
+	 *   Table sourceTable = ...
+	 *   String tableName = sourceTable.toString();
+	 *   // sourceTable is not registered to the table environment
+	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+	 * }
+	 * </pre>
+	 *
+	 * @param stmt The SQL statement to evaluate.
+	 * @param config The {@link QueryConfig} to use.
+	 */
+	void sqlUpdate(String stmt, StreamQueryConfig config);
+
+	/**
+	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+	 *
+	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+	 *
+	 * @param table The Table to write to the sink.
+	 * @param queryConfig The {@link StreamQueryConfig} to use.
+	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
+	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+	 *        {@link Table} is written.
+	 */
+	void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program.
+	 *
+	 * <p>The program execution will be logged and displayed with the provided name
+	 *
+	 * <p>It calls the {@link StreamExecutionEnvironment#execute(String)} on the underlying
+	 * {@link StreamExecutionEnvironment}. In contrast to the {@link TableEnvironment} this
+	 * environment translates queries eagerly. Therefore the values in {@link QueryConfig}
+	 * parameter are ignored.
+	 *
+	 * @param jobName Desired name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	@Override
+	JobExecutionResult execute(String jobName) throws Exception;
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 05815dd..4dde3d6 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -91,14 +92,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		this.executionEnvironment = executionEnvironment;
 	}
 
-	/**
-	 * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for
-	 * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting
-	 * from/to {@link DataStream}.
-	 *
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
 	public static StreamTableEnvironment create(
 			StreamExecutionEnvironment executionEnvironment,
 			EnvironmentSettings settings,
@@ -241,7 +234,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			table.getQueryOperation(),
 			TypeConversions.fromLegacyInfoToDataType(typeInfo),
 			OutputConversionModifyOperation.UpdateMode.APPEND);
-		queryConfigProvider.setConfig(queryConfig);
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
 		return toDataStream(table, modifyOperation);
 	}
 
@@ -273,7 +268,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			table.getQueryOperation(),
 			wrapWithChangeFlag(typeInfo),
 			OutputConversionModifyOperation.UpdateMode.RETRACT);
-		queryConfigProvider.setConfig(queryConfig);
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
 		return toDataStream(table, modifyOperation);
 	}
 
@@ -282,6 +279,22 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		return (StreamTableDescriptor) super.connect(connectorDescriptor);
 	}
 
+	@Override
+	public void sqlUpdate(String stmt, StreamQueryConfig config) {
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(config.getMinIdleStateRetentionTime()),
+			Time.milliseconds(config.getMaxIdleStateRetentionTime()));
+		sqlUpdate(stmt);
+	}
+
+	@Override
+	public void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued) {
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
+		insertInto(table, sinkPath, sinkPathContinued);
+	}
+
 	/**
 	 * This is a temporary workaround for Python API. Python API should not use StreamExecutionEnvironment at all.
 	 */
@@ -305,6 +318,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
 	}
 
+	@Override
+	protected boolean isEagerOperationTranslation() {
+		return true;
+	}
+
 	private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
 		try {
 			return TypeExtractor.createTypeInfo(clazz);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
index a7cac52..c70d948 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
@@ -22,7 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * The {@link BatchQueryConfig} holds parameters to configure the behavior of batch queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
+@Deprecated
 @PublicEvolving
 public class BatchQueryConfig implements QueryConfig {
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
index 99cdb41..d330b45 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
@@ -24,8 +24,11 @@ import java.io.Serializable;
 
 /**
  * The {@link QueryConfig} holds parameters to configure the behavior of queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
 @PublicEvolving
+@Deprecated
 public interface QueryConfig extends Serializable {
 
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
index 86c0c75..8c774ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.table.api;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 
 /**
  * The {@link StreamQueryConfig} holds parameters to configure the behavior of streaming queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
+@Deprecated
 @PublicEvolving
 public class StreamQueryConfig implements QueryConfig {
 
@@ -39,6 +43,15 @@ public class StreamQueryConfig implements QueryConfig {
 	 */
 	private long maxIdleStateRetentionTime = 0L;
 
+	@Internal
+	public StreamQueryConfig(long minIdleStateRetentionTime, long maxIdleStateRetentionTime) {
+		this.minIdleStateRetentionTime = minIdleStateRetentionTime;
+		this.maxIdleStateRetentionTime = maxIdleStateRetentionTime;
+	}
+
+	public StreamQueryConfig() {
+	}
+
 	/**
 	 * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
 	 * was not updated, will be retained.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 325732f..8bdc461 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.table.api.internal.CompositePlannerConfig;
 import org.apache.flink.util.Preconditions;
 
@@ -59,6 +60,18 @@ public class TableConfig {
 	private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
 	/**
+	 * The minimum time until state which was not updated will be retained.
+	 * State might be cleared and removed if it was not updated for the defined period of time.
+	 */
+	private long minIdleStateRetentionTime = 0L;
+
+	/**
+	 * The maximum time until state which was not updated will be retained.
+	 * State will be cleared and removed if it was not updated for the defined period of time.
+	 */
+	private long maxIdleStateRetentionTime = 0L;
+
+	/**
 	 * Returns the timezone for date/time/timestamp conversions.
 	 */
 	public TimeZone getTimeZone() {
@@ -135,6 +148,51 @@ public class TableConfig {
 		this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
 	}
 
+	/**
+	 * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the minimum time and will never
+	 * be kept if it was idle for more than the maximum time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.
+	 *
+	 * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+	 *                never clean-up the state.
+	 * @param maxTime The maximum time interval for which idle state is retained. Must be at least
+	 *                5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
+	 */
+	public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
+		if (maxTime.toMilliseconds() - minTime.toMilliseconds() < 300000 &&
+			!(maxTime.toMilliseconds() == 0 && minTime.toMilliseconds() == 0)) {
+			throw new IllegalArgumentException(
+				"Difference between minTime: " + minTime.toString() + " and maxTime: " + maxTime.toString() +
+					"shoud be at least 5 minutes.");
+		}
+		minIdleStateRetentionTime = minTime.toMilliseconds();
+		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+	}
+
+	/**
+	 * @return The minimum time until state which was not updated will be retained.
+	 */
+	public long getMinIdleStateRetentionTime() {
+		return minIdleStateRetentionTime;
+	}
+
+	/**
+	 * @return The maximum time until state which was not updated will be retained.
+	 */
+	public long getMaxIdleStateRetentionTime() {
+		return maxIdleStateRetentionTime;
+	}
+
 	public static TableConfig getDefault() {
 		return new TableConfig();
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 85d20f6..686e816 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ExternalCatalog;
@@ -188,21 +189,6 @@ public interface TableEnvironment {
 	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
 	 *
 	 * @param table The Table to write to the sink.
-	 * @param queryConfig The {@link QueryConfig} to use.
-	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
-	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
-	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
-	 *        {@link Table} is written.
-	 */
-	void insertInto(Table table, QueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
-
-	/**
-	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
-	 *
-	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
-	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
-	 *
-	 * @param table The Table to write to the sink.
 	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
 	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
 	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
@@ -336,31 +322,6 @@ public interface TableEnvironment {
 	void sqlUpdate(String stmt);
 
 	/**
-	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
-	 * NOTE: Currently only SQL INSERT statements are supported.
-	 *
-	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
-	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
-	 * called, for example when it is embedded into a String.
-	 * Hence, SQL queries can directly reference a {@link Table} as follows:
-	 *
-	 * <pre>
-	 * {@code
-	 *   // register the configured table sink into which the result is inserted.
-	 *   tEnv.registerTableSink("sinkTable", configuredSink);
-	 *   Table sourceTable = ...
-	 *   String tableName = sourceTable.toString();
-	 *   // sourceTable is not registered to the table environment
-	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
-	 * }
-	 * </pre>
-	 *
-	 * @param stmt The SQL statement to evaluate.
-	 * @param config The {@link QueryConfig} to use.
-	 */
-	void sqlUpdate(String stmt, QueryConfig config);
-
-	/**
 	 * Gets the current default catalog name of the current session.
 	 *
 	 * @return The current default catalog name that is used for the path resolution.
@@ -496,4 +457,24 @@ public interface TableEnvironment {
 	 * Returns the table config that defines the runtime behavior of the Table API.
 	 */
 	TableConfig getConfig();
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program.
+	 *
+	 * <p>The program execution will be logged and displayed with the provided name
+	 *
+	 * <p><b>NOTE:</b>It is highly advised to set all parameters in the {@link TableConfig}
+	 * on the very beginning of the program. It is undefined what configurations values will
+	 * be used for the execution if queries are mixed with config changes. It depends on
+	 * the characteristic of the particular parameter. For some of them the value from the
+	 * point in time of query construction (e.g. the currentCatalog) will be used. On the
+	 * other hand some values might be evaluated according to the state from the time when
+	 * this method is called (e.g. timeZone).
+	 *
+	 * @param jobName Desired name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	JobExecutionResult execute(String jobName) throws Exception;
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
deleted file mode 100644
index 171354f..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.internal;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.PlannerConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
-import org.apache.flink.table.delegation.Planner;
-
-/**
- * An adapter to {@link PlannerConfig} that enables to pass {@link org.apache.flink.table.api.QueryConfig}
- * to {@link Planner} via {@link org.apache.flink.table.api.TableConfig}.
- */
-@Internal
-public class QueryConfigProvider implements PlannerConfig {
-	private StreamQueryConfig config;
-
-	public StreamQueryConfig getConfig() {
-		return config;
-	}
-
-	public void setConfig(StreamQueryConfig config) {
-		this.config = config;
-	}
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index e94c65a..b2c786a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -20,11 +20,10 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.QueryConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -76,12 +75,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private final String defaultCatalogName;
 	private final String defaultDatabaseName;
-	private final TableConfig tableConfig;
 	private final OperationTreeBuilder operationTreeBuilder;
+	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
 
+	protected final TableConfig tableConfig;
 	protected final Executor execEnv;
 	protected final FunctionCatalog functionCatalog;
-	protected final QueryConfigProvider queryConfigProvider = new QueryConfigProvider();
 	protected final Planner planner;
 
 	protected TableEnvironmentImpl(
@@ -95,7 +94,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		this.execEnv = executor;
 
 		this.tableConfig = tableConfig;
-		this.tableConfig.addPlannerConfig(queryConfigProvider);
 		this.defaultCatalogName = catalogManager.getCurrentCatalog();
 		this.defaultDatabaseName = catalogManager.getCurrentDatabase();
 
@@ -266,31 +264,24 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
-	public void sqlUpdate(String stmt) {
-		sqlUpdate(stmt, new StreamQueryConfig());
-	}
-
-	@Override
-	public void insertInto(Table table, QueryConfig queryConfig, String path, String... pathContinued) {
+	public void insertInto(Table table, String path, String... pathContinued) {
 		List<String> fullPath = new ArrayList<>(Arrays.asList(pathContinued));
 		fullPath.add(0, path);
 
-		queryConfigProvider.setConfig((StreamQueryConfig) queryConfig);
-		List<Transformation<?>> translate = planner.translate(Collections.singletonList(
+		List<ModifyOperation> modifyOperations = Collections.singletonList(
 			new CatalogSinkModifyOperation(
 				fullPath,
-				table.getQueryOperation())));
-
-		execEnv.apply(translate);
-	}
+				table.getQueryOperation()));
 
-	@Override
-	public void insertInto(Table table, String path, String... pathContinued) {
-		insertInto(table, new StreamQueryConfig(), path, pathContinued);
+		if (isEagerOperationTranslation()) {
+			translate(modifyOperations);
+		} else {
+			buffer(modifyOperations);
+		}
 	}
 
 	@Override
-	public void sqlUpdate(String stmt, QueryConfig config) {
+	public void sqlUpdate(String stmt) {
 		List<Operation> operations = planner.parse(stmt);
 
 		if (operations.size() != 1) {
@@ -301,11 +292,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		Operation operation = operations.get(0);
 
 		if (operation instanceof ModifyOperation) {
-			queryConfigProvider.setConfig((StreamQueryConfig) config);
-			List<Transformation<?>> transformations =
-				planner.translate(Collections.singletonList((ModifyOperation) operation));
-
-			execEnv.apply(transformations);
+			List<ModifyOperation> modifyOperations = Collections.singletonList((ModifyOperation) operation);
+			if (isEagerOperationTranslation()) {
+				translate(modifyOperations);
+			} else {
+				buffer(modifyOperations);
+			}
 		} else {
 			throw new TableException(
 				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
@@ -337,6 +329,48 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		return tableConfig;
 	}
 
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		translate(bufferedModifyOperations);
+		bufferedModifyOperations.clear();
+		return execEnv.execute(jobName);
+	}
+
+	/**
+	 * Defines the behavior of this {@link TableEnvironment}. If true the queries will
+	 * be translated immediately. If false the {@link ModifyOperation}s will be buffered
+	 * and translated only when {@link #execute(String)} is called.
+	 *
+	 * <p>If the {@link TableEnvironment} works in a lazy manner it is undefined what
+	 * configurations values will be used. It depends on the characteristic of the particular
+	 * parameter. Some might used values current to the time of query construction (e.g. the currentCatalog)
+	 * and some use values from the time when {@link #execute(String)} is called (e.g. timeZone).
+	 *
+	 * @return true if the queries should be translated immediately.
+	 */
+	protected boolean isEagerOperationTranslation() {
+		return false;
+	}
+
+	/**
+	 * Subclasses can override this method to add additional checks.
+	 *
+	 * @param tableSource tableSource to validate
+	 */
+	protected void validateTableSource(TableSource<?> tableSource) {
+		TableSourceValidation.validateTableSource(tableSource);
+	}
+
+	private void translate(List<ModifyOperation> modifyOperations) {
+		List<Transformation<?>> transformations = planner.translate(modifyOperations);
+
+		execEnv.apply(transformations);
+	}
+
+	private void buffer(List<ModifyOperation> modifyOperations) {
+		bufferedModifyOperations.addAll(modifyOperations);
+	}
+
 	protected void registerTableInternal(String name, CatalogBaseTable table) {
 		try {
 			checkValidTableName(name);
@@ -374,15 +408,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		}
 	}
 
-	/**
-	 * Subclasses can override this method to add additional checks.
-	 *
-	 * @param tableSource tableSource to validate
-	 */
-	protected void validateTableSource(TableSource<?> tableSource) {
-		TableSourceValidation.validateTableSource(tableSource);
-	}
-
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
 		Optional<CatalogBaseTable> table = getCatalogTable(defaultCatalogName, defaultDatabaseName, name);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index ae08d24..41e483b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.table.api.AggregatedTable;
 import org.apache.flink.table.api.FlatAggregateTable;
 import org.apache.flink.table.api.GroupWindow;
@@ -27,6 +28,7 @@ import org.apache.flink.table.api.GroupedTable;
 import org.apache.flink.table.api.OverWindow;
 import org.apache.flink.table.api.OverWindowedTable;
 import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
@@ -416,7 +418,14 @@ public class TableImpl implements Table {
 
 	@Override
 	public void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued) {
-		tableEnvironment.insertInto(this, conf, tablePath, tablePathContinued);
+		if (conf instanceof StreamQueryConfig) {
+			StreamQueryConfig streamQueryConfig = (StreamQueryConfig) conf;
+			tableEnvironment.getConfig().setIdleStateRetentionTime(
+				Time.milliseconds(streamQueryConfig.getMinIdleStateRetentionTime()),
+				Time.milliseconds(streamQueryConfig.getMaxIdleStateRetentionTime())
+			);
+		}
+		tableEnvironment.insertInto(this, tablePath, tablePathContinued);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index fa6b178..6b3ecbc 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.api.{TableEnvironment, _}
@@ -24,6 +25,7 @@ import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
 
 /**
   * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works
@@ -155,6 +157,66 @@ trait BatchTableEnvironment extends TableEnvironment {
     queryConfig: BatchQueryConfig): DataSet[T]
 
   /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+    * called, for example when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the configured table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", configuredSink);
+    *   Table sourceTable = ...
+    *   String tableName = sourceTable.toString();
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+    * }}}
+    *
+    * @param stmt   The SQL statement to evaluate.
+    * @param config The [[BatchQueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * See the documentation of TableEnvironment#useDatabase or
+    * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+    *
+    * @param table             The Table to write to the sink.
+    * @param queryConfig       The [[BatchQueryConfig]] to use.
+    * @param sinkPath          The first part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written. This is to ensure at least the
+    *                          name of the [[TableSink]] is provided.
+    * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written.
+    */
+  def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit
+
+  /**
+    * Triggers the program execution. The environment will execute all parts of
+    * the program.
+    *
+    * The program execution will be logged and displayed with the provided name
+    *
+    * It calls the ExecutionEnvironment#execute on the underlying
+    * [[ExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+    * environment translates queries eagerly.
+    *
+    * @param jobName Desired name of the job
+    * @return The result of the job execution, containing elapsed time and accumulators.
+    * @throws Exception which occurs during job execution.
+    */
+  @throws[Exception]
+  override def execute(jobName: String): JobExecutionResult
+
+  /**
     * Creates a table source and/or table sink from a descriptor.
     *
     * Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 3baa5fa..db88251 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.api.scala
 
 import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.{TableEnvironment, _}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
 
 /**
   * This table environment is the entry point and central context for creating Table & SQL
@@ -209,6 +211,67 @@ trait StreamTableEnvironment extends TableEnvironment {
     table: Table,
     queryConfig: StreamQueryConfig): DataStream[(Boolean, T)]
 
+
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+    * called, for example when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the configured table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", configuredSink);
+    *   Table sourceTable = ...
+    *   String tableName = sourceTable.toString();
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+    * }}}
+    *
+    * @param stmt   The SQL statement to evaluate.
+    * @param config The [[QueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * See the documentation of TableEnvironment#useDatabase or
+    * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+    *
+    * @param table             The Table to write to the sink.
+    * @param queryConfig       The [[StreamQueryConfig]] to use.
+    * @param sinkPath          The first part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written. This is to ensure at least the name
+    *                          of the [[TableSink]] is provided.
+    * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written.
+    */
+  def insertInto(
+    table: Table,
+    queryConfig: StreamQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit
+
+  /**
+    * Triggers the program execution. The environment will execute all parts of
+    * the program.
+    *
+    * The program execution will be logged and displayed with the provided name
+    *
+    * It calls the StreamExecutionEnvironment#execute on the underlying
+    * [[StreamExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+    * environment translates queries eagerly.
+    *
+    * @param jobName Desired name of the job
+    * @return The result of the job execution, containing elapsed time and accumulators.
+    * @throws Exception which occurs during job execution.
+    */
+  @throws[Exception]
+  override def execute(jobName: String): JobExecutionResult
+
   /**
     * Creates a table source and/or table sink from a descriptor.
     *
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 990f043..fedb317 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.api.scala.internal
 
 import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.scala._
@@ -102,7 +103,9 @@ class StreamTableEnvironmentImpl (
       table.getQueryOperation,
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.APPEND)
-    queryConfigProvider.setConfig(queryConfig)
+    tableConfig.setIdleStateRetentionTime(
+      Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+      Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
     toDataStream(table, modifyOperation)
   }
 
@@ -121,7 +124,9 @@ class StreamTableEnvironmentImpl (
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.RETRACT)
 
-    queryConfigProvider.setConfig(queryConfig)
+    tableConfig.setIdleStateRetentionTime(
+        Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+        Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
     toDataStream(table, modifyOperation)
   }
 
@@ -182,6 +187,8 @@ class StreamTableEnvironmentImpl (
     }
   }
 
+  override protected def isEagerOperationTranslation(): Boolean = true
+
   private def toDataStream[T](
       table: Table,
       modifyOperation: OutputConversionModifyOperation)
@@ -233,6 +240,26 @@ class StreamTableEnvironmentImpl (
       typeInfoSchema.getIndices,
       typeInfoSchema.toTableSchema)
   }
+
+  override def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit = {
+    tableConfig
+      .setIdleStateRetentionTime(
+        Time.milliseconds(config.getMinIdleStateRetentionTime),
+        Time.milliseconds(config.getMaxIdleStateRetentionTime))
+    sqlUpdate(stmt)
+  }
+
+  override def insertInto(
+      table: Table,
+      queryConfig: StreamQueryConfig,
+      sinkPath: String,
+      sinkPathContinued: String*): Unit = {
+    tableConfig
+      .setIdleStateRetentionTime(
+        Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+        Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
+    insertInto(table, sinkPath, sinkPathContinued: _*)
+  }
 }
 
 object StreamTableEnvironmentImpl {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b8425d7..eb0c543 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -65,8 +66,6 @@ abstract class BatchTableEnvImpl(
     planningConfigurationBuilder
   )
 
-  override def queryConfig: BatchQueryConfig = new BatchQueryConfig
-
   /**
     * Registers an internal [[BatchTableSource]] in this [[TableEnvImpl]]'s catalog without
     * name checking. Registered tables can be referenced in SQL queries.
@@ -103,34 +102,25 @@ abstract class BatchTableEnvImpl(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param queryConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
     */
   override private[flink] def writeToSink[T](
       table: Table,
-      sink: TableSink[T],
-      queryConfig: QueryConfig): Unit = {
-
-    // Check the query configuration to be a batch one.
-    val batchQueryConfig = queryConfig match {
-      case batchConfig: BatchQueryConfig => batchConfig
-      case _ =>
-        throw new TableException("BatchQueryConfig required to configure batch query.")
-    }
+      sink: TableSink[T]): Unit = {
 
     sink match {
       case batchSink: BatchTableSink[T] =>
         val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
           .asInstanceOf[TypeInformation[T]]
         // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+        val result: DataSet[T] = translate(table)(outputType)
         // Give the DataSet to the TableSink to emit it.
         batchSink.emitDataSet(result)
       case boundedSink: OutputFormatTableSink[T] =>
         val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
           .asInstanceOf[TypeInformation[T]]
         // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+        val result: DataSet[T] = translate(table)(outputType)
         // use the OutputFormat to consume the DataSet.
         val dataSink = result.output(boundedSink.getOutputFormat)
         dataSink.name(
@@ -188,8 +178,7 @@ abstract class BatchTableEnvImpl(
     val optimizedPlan = optimizer.optimize(ast)
     val dataSet = translate[Row](
       optimizedPlan,
-      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan),
-      queryConfig)(
+      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
       new GenericTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
@@ -211,6 +200,8 @@ abstract class BatchTableEnvImpl(
 
   def explain(table: Table): String = explain(table: Table, extended = false)
 
+  override def execute(jobName: String): JobExecutionResult = execEnv.execute(jobName)
+
   protected def asQueryOperation[T](dataSet: DataSet[T], fields: Option[Array[Expression]])
     : DataSetQueryOperation[T] = {
     val inputType = dataSet.getType
@@ -253,21 +244,17 @@ abstract class BatchTableEnvImpl(
     * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
     *
     * @param table The root node of the relational expression tree.
-    * @param queryConfig The configuration for the query to generate.
     * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](
-      table: Table,
-      queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     val queryOperation = table.getQueryOperation
     val relNode = getRelBuilder.tableOperation(queryOperation).build()
     val dataSetPlan = optimizer.optimize(relNode)
     translate(
       dataSetPlan,
-      getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan),
-      queryConfig)
+      getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan))
   }
 
   /**
@@ -276,20 +263,18 @@ abstract class BatchTableEnvImpl(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can lose the
     *                    field naming during optimization we pass the row type separately.
-    * @param queryConfig The configuration for the query to generate.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](
       logicalPlan: RelNode,
-      logicalType: TableSchema,
-      queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+      logicalType: TableSchema)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     validateInputTypeInfo(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
-        val plan = node.translateToPlan(this, queryConfig)
+        val plan = node.translateToPlan(this, new BatchQueryConfig)
         val conversion =
           getConversionMapper(
             plan.getType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2d37173..24e52d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -94,11 +94,6 @@ abstract class TableEnvImpl(
     case _ => false
   }
 
-  private[flink] def queryConfig: QueryConfig = this match {
-    case _: BatchTableEnvImpl => new BatchQueryConfig
-    case _ => null
-  }
-
   override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
     catalogManager.registerExternalCatalog(name, externalCatalog)
   }
@@ -403,10 +398,6 @@ abstract class TableEnvImpl(
   }
 
   override def sqlUpdate(stmt: String): Unit = {
-    sqlUpdate(stmt, this.queryConfig)
-  }
-
-  override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
     val planner = getFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(stmt)
@@ -424,7 +415,7 @@ abstract class TableEnvImpl(
         val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
 
         // insert query result into sink table
-        insertInto(queryResult, config, targetTablePath.asScala:_*)
+        insertInto(queryResult, targetTablePath.asScala:_*)
       case _ =>
         throw new TableException(
           "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
@@ -444,25 +435,15 @@ abstract class TableEnvImpl(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param conf The [[QueryConfig]] to use.
     * @tparam T The data type that the [[TableSink]] expects.
     */
-  private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
-
-
-  override def insertInto(
-    table: Table,
-    queryConfig: QueryConfig,
-    path: String,
-    pathContinued: String*): Unit = {
-    insertInto(table, queryConfig, path +: pathContinued: _*)
-  }
+  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
 
   override def insertInto(
     table: Table,
     path: String,
     pathContinued: String*): Unit = {
-    insertInto(table, queryConfig, path, pathContinued: _*)
+    insertInto(table, path +: pathContinued: _*)
   }
 
   /**
@@ -470,9 +451,8 @@ abstract class TableEnvImpl(
     *
     * @param table The table to write to the TableSink.
     * @param sinkTablePath The name of the registered TableSink.
-    * @param conf The query configuration to use.
     */
-  private def insertInto(table: Table, conf: QueryConfig, sinkTablePath: String*): Unit = {
+  private def insertInto(table: Table, sinkTablePath: String*): Unit = {
 
     // check that sink table exists
     if (null == sinkTablePath) {
@@ -491,7 +471,7 @@ abstract class TableEnvImpl(
         // validate schema of source table and table sink
         TableSinkUtils.validateSink(table.getQueryOperation, sinkTablePath.asJava, tableSink)
         // emit the table to the configured table sink
-        writeToSink(table, tableSink, conf)
+        writeToSink(table, tableSink)
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
index 5ef9917..7358721 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
@@ -68,26 +68,26 @@ class BatchTableEnvironmentImpl(
 
   override def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
     // Use the default query config.
-    translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
   }
 
   override def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
     // Use the default batch query config.
-    translate[T](table, queryConfig)(typeInfo)
+    translate[T](table)(typeInfo)
   }
 
   override def toDataSet[T](
       table: Table,
       clazz: Class[T],
       queryConfig: BatchQueryConfig): DataSet[T] = {
-    translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
   }
 
   override def toDataSet[T](
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: BatchQueryConfig): DataSet[T] = {
-    translate[T](table, queryConfig)(typeInfo)
+    translate[T](table)(typeInfo)
   }
 
   override def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
@@ -112,4 +112,14 @@ class BatchTableEnvironmentImpl(
 
     registerAggregateFunctionInternal[T, ACC](name, f)
   }
+
+  override def sqlUpdate(
+    stmt: String,
+    config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+  override def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
index 6c1d155..8e13426 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
@@ -63,12 +63,12 @@ class BatchTableEnvironmentImpl(
 
   override def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
     // Use the default batch query config.
-    wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
   override def toDataSet[T: TypeInformation](
     table: Table, queryConfig: BatchQueryConfig): DataSet[T] = {
-    wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
   override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
@@ -81,5 +81,13 @@ class BatchTableEnvironmentImpl(
   : Unit = {
     registerAggregateFunctionInternal[T, ACC](name, f)
   }
+
+  override def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+  override def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 92465d8..8cd11fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -17,23 +17,23 @@
  */
 
 package org.apache.flink.table.planner
-import _root_.java.lang.{Boolean => JBool}
-import _root_.java.util.{Objects, List => JList}
-import java.util
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
+
+import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util.{Objects, List => JList}
+import java.util
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.QueryConfigProvider
 import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
 import org.apache.flink.table.delegation.{Executor, Planner}
@@ -187,7 +187,10 @@ class StreamPlanner(
   }
 
   private def unwrapQueryConfig = {
-    config.getPlannerConfig.unwrap(classOf[QueryConfigProvider]).get().getConfig
+    new StreamQueryConfig(
+      config.getMinIdleStateRetentionTime,
+      config.getMaxIdleStateRetentionTime
+    )
   }
 
   private def explain(tableOperation: QueryOperation, queryConfig: StreamQueryConfig) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index 55c3c31..faccc9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -72,6 +72,6 @@ class InsertIntoValidationTest extends TableTestBase {
     val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
 
     // must fail because partial insert is not supported yet.
-    util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+    util.tableEnv.sqlUpdate(sql)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
index 86458b3..6d382f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
@@ -19,19 +19,20 @@ package org.apache.flink.table.runtime.harness
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
-
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.harness.HarnessTestBase._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{MultiArgCount, MultiArgSum}
 import org.apache.flink.types.Row
+
 import org.junit.Assert.assertTrue
 import org.junit.Test
 
@@ -39,8 +40,13 @@ import scala.collection.mutable
 
 class GroupAggregateHarnessTest extends HarnessTestBase {
 
-  protected var queryConfig =
+  private val queryConfig =
     new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+  private val tableConfig = new TableConfig {
+    override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+    override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+  }
 
   @Test
   def testAggregate(): Unit = {
@@ -185,7 +191,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testDistinctAggregateWithRetract(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val data = new mutable.MutableList[(JLong, JInt)]
     val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -202,7 +208,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
          |""".stripMargin)
 
     val testHarness = createHarnessTester[String, CRow, CRow](
-      sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+      sqlQuery.toRetractStream[Row], "groupBy")
 
     testHarness.setStateBackend(getStateBackend)
     testHarness.open()
@@ -273,7 +279,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testDistinctAggregateWithDifferentArgumentOrder(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val data = new mutable.MutableList[(JLong, JLong, JLong)]
     val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
@@ -292,7 +298,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
          |""".stripMargin)
 
     val testHarness = createHarnessTester[String, CRow, CRow](
-      sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+      sqlQuery.toRetractStream[Row], "groupBy")
 
     testHarness.setStateBackend(getStateBackend)
     testHarness.open()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
index 56d79d5..9519c3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
@@ -17,33 +17,37 @@
  */
 package org.apache.flink.table.runtime.harness
 
-import java.lang.{Integer => JInt}
-import java.util.concurrent.ConcurrentLinkedQueue
-
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.harness.HarnessTestBase._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.utils.{Top3WithEmitRetractValue, Top3WithMapView}
 import org.apache.flink.types.Row
+
 import org.junit.Test
 
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.collection.mutable
 
 class TableAggregateHarnessTest extends HarnessTestBase {
 
-  protected var queryConfig =
-    new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+  private val tableConfig = new TableConfig {
+    override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+    override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+  }
   val data = new mutable.MutableList[(Int, Int)]
 
   @Test
   def testTableAggregate(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithMapView
     tEnv.registerFunction("top3", top3)
@@ -54,7 +58,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('a, 'b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+      resultTable.toRetractStream[Row], "groupBy: (a)")
 
     testHarness.open()
 
@@ -107,7 +111,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
   def testTableAggregateEmitRetractValueIncrementally(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithEmitRetractValue
     val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -117,7 +121,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('a, 'b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+      resultTable.toRetractStream[Row], "groupBy: (a)")
 
     testHarness.open()
 
@@ -160,7 +164,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
   def testTableAggregateWithRetractInput(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithMapView
     tEnv.registerFunction("top3", top3)
@@ -172,7 +176,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "select: (Top3WithMapView")
+      resultTable.toRetractStream[Row], "select: (Top3WithMapView")
 
     testHarness.open()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 09bbe77..8f138de 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.utils
 
 import java.util.Optional
 
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.catalog.{Catalog, ExternalCatalog}
@@ -69,8 +70,6 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def sqlUpdate(stmt: String): Unit = ???
 
-  override def sqlUpdate(stmt: String, config: QueryConfig): Unit = ???
-
   override def getConfig: TableConfig = ???
 
   override def registerCatalog(
@@ -89,12 +88,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def insertInto(
     table: Table,
-    queryConfig: QueryConfig,
     sinkPath: String,
     sinkPathContinued: String*): Unit = ???
 
-  override def insertInto(
-    table: Table,
-    sinkPath: String,
-    sinkPathContinued: String*): Unit = ???
+  override def execute(jobName: String): JobExecutionResult = ???
 }