You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 13:34:34 UTC

[flink] branch master updated (f2494ce -> 593ff73)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f2494ce  [FLINK-9311] [pubsub] Improvements to builders + minor improvement to PubSubSink flush logic
     new 177e3a6  [FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment
     new 767d924  [hotfix][table-api] Remove CompositePlannerConfig
     new 593ff73  [FLINK-13088][table-api] Support instantiating the unfied TableEnvironment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/table_environment.py    |  47 ++++++++
 .../table/tests/test_environment_completeness.py   |   3 +-
 .../table/client/gateway/local/LocalExecutor.java  |   8 +-
 .../table/api/java/BatchTableEnvironment.java      |  41 +++++++
 .../table/api/java/StreamTableEnvironment.java     |  61 ++++++++++
 .../java/internal/StreamTableEnvironmentImpl.java  |  44 +++++--
 .../apache/flink/table/api/BatchQueryConfig.java   |   3 +
 .../org/apache/flink/table/api/QueryConfig.java    |   3 +
 .../apache/flink/table/api/StreamQueryConfig.java  |  13 +++
 .../org/apache/flink/table/api/TableConfig.java    |  65 ++++++++++-
 .../apache/flink/table/api/TableEnvironment.java   |  87 +++++++-------
 .../table/api/internal/CompositePlannerConfig.java |  45 -------
 .../table/api/internal/QueryConfigProvider.java    |  41 -------
 .../table/api/internal/TableEnvironmentImpl.java   | 129 +++++++++++++++------
 .../apache/flink/table/api/internal/TableImpl.java |  11 +-
 .../table/api/scala/BatchTableEnvironment.scala    |  62 ++++++++++
 .../table/api/scala/StreamTableEnvironment.scala   |  63 ++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      |  41 ++++++-
 .../table/api/internal/BatchTableEnvImpl.scala     |  37 ++----
 .../flink/table/api/internal/TableEnvImpl.scala    |  30 +----
 .../java/internal/BatchTableEnvironmentImpl.scala  |  18 ++-
 .../scala/internal/BatchTableEnvironmentImpl.scala |  12 +-
 .../apache/flink/table/planner/StreamPlanner.scala |  25 ++--
 .../batch/table/JavaTableEnvironmentITCase.java    |   2 +-
 .../sql/validation/InsertIntoValidationTest.scala  |   2 +-
 .../table/api/batch/table/CorrelateTest.scala      |   2 +-
 .../table/api/stream/table/CorrelateTest.scala     |   2 +-
 .../flink/table/plan/NormalizationRulesTest.scala  |   4 +-
 .../harness/GroupAggregateHarnessTest.scala        |  18 ++-
 .../harness/TableAggregateHarnessTest.scala        |  28 +++--
 .../flink/table/utils/MockTableEnvironment.scala   |   9 +-
 31 files changed, 673 insertions(+), 283 deletions(-)
 delete mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompositePlannerConfig.java
 delete mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java


[flink] 01/03: [FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 177e3a69a701ff2dc217b039e06f008af7d75d43
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jun 17 11:44:08 2019 +0200

    [FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment
---
 flink-python/pyflink/table/table_environment.py    | 47 +++++++++++
 .../table/tests/test_environment_completeness.py   |  3 +-
 .../table/client/gateway/local/LocalExecutor.java  |  8 +-
 .../table/api/java/BatchTableEnvironment.java      | 41 ++++++++++
 .../table/api/java/StreamTableEnvironment.java     | 61 ++++++++++++++
 .../java/internal/StreamTableEnvironmentImpl.java  | 38 ++++++---
 .../apache/flink/table/api/BatchQueryConfig.java   |  3 +
 .../org/apache/flink/table/api/QueryConfig.java    |  3 +
 .../apache/flink/table/api/StreamQueryConfig.java  | 13 +++
 .../org/apache/flink/table/api/TableConfig.java    | 58 +++++++++++++
 .../apache/flink/table/api/TableEnvironment.java   | 61 +++++---------
 .../table/api/internal/QueryConfigProvider.java    | 41 ----------
 .../table/api/internal/TableEnvironmentImpl.java   | 95 ++++++++++++++--------
 .../apache/flink/table/api/internal/TableImpl.java | 11 ++-
 .../table/api/scala/BatchTableEnvironment.scala    | 62 ++++++++++++++
 .../table/api/scala/StreamTableEnvironment.scala   | 63 ++++++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      | 31 ++++++-
 .../table/api/internal/BatchTableEnvImpl.scala     | 37 +++------
 .../flink/table/api/internal/TableEnvImpl.scala    | 30 ++-----
 .../java/internal/BatchTableEnvironmentImpl.scala  | 18 +++-
 .../scala/internal/BatchTableEnvironmentImpl.scala | 12 ++-
 .../apache/flink/table/planner/StreamPlanner.scala | 25 +++---
 .../sql/validation/InsertIntoValidationTest.scala  |  2 +-
 .../harness/GroupAggregateHarnessTest.scala        | 18 ++--
 .../harness/TableAggregateHarnessTest.scala        | 28 ++++---
 .../flink/table/utils/MockTableEnvironment.scala   |  9 +-
 26 files changed, 593 insertions(+), 225 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 19b5199..3c8b44c 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -446,6 +446,25 @@ class TableEnvironment(object):
         """
         pass
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        <p>The program execution will be logged and displayed with the provided name
+
+        <p><b>NOTE:</b>It is highly advised to set all parameters in the :class:`TableConfig`
+        on the very beginning of the program. It is undefined what configurations values will
+        be used for the execution if queries are mixed with config changes. It depends on
+        the characteristic of the particular parameter. For some of them the value from the
+        point in time of query construction (e.g. the currentCatalog) will be used. On the
+        other hand some values might be evaluated according to the state from the time when
+        this method is called (e.g. timeZone).
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     def from_elements(self, elements, schema=None, verify_schema=True):
         """
         Creates a table from a collection of elements.
@@ -623,6 +642,20 @@ class StreamTableEnvironment(TableEnvironment):
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        The program execution will be logged and displayed with the provided name
+
+        It calls the StreamExecutionEnvironment#execute on the underlying
+        :class:`StreamExecutionEnvironment`. This environment translates queries eagerly.
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     @staticmethod
     def create(stream_execution_environment, table_config=None):
         """
@@ -710,6 +743,20 @@ class BatchTableEnvironment(TableEnvironment):
         return BatchTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
+    def execute(self, job_name):
+        """
+        Triggers the program execution. The environment will execute all parts of
+        the program.
+
+        The program execution will be logged and displayed with the provided name
+
+        It calls the ExecutionEnvironment#execute on the underlying
+        :class:`ExecutionEnvironment`. This environment translates queries eagerly.
+
+        :param job_name Desired name of the job
+        """
+        self._j_tenv.execute(job_name)
+
     @staticmethod
     def create(execution_environment, table_config=None):
         """
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index a8fc991..1f82445 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,9 +41,10 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
         # listTables should be supported when catalog supported in python.
         # getCompletionHints has been deprecated. It will be removed in the next release.
+        # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
         return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
                 'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
-                'getCompletionHints'}
+                'getCompletionHints', 'create'}
 
 
 if __name__ == '__main__':
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5eabbfe..af128ef 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -35,10 +35,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
@@ -509,7 +511,11 @@ public class LocalExecutor implements Executor {
 		// parse and validate statement
 		try {
 			context.wrapClassLoader(() -> {
-				tableEnv.sqlUpdate(updateStatement, queryConfig);
+				if (tableEnv instanceof StreamTableEnvironment) {
+					((StreamTableEnvironment) tableEnv).sqlUpdate(updateStatement, (StreamQueryConfig) queryConfig);
+				} else {
+					tableEnv.sqlUpdate(updateStatement);
+				}
 				return null;
 			});
 		} catch (Throwable t) {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 16f63af..7b7692d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.descriptors.BatchTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
 
 import java.lang.reflect.Constructor;
 
@@ -211,6 +212,46 @@ public interface BatchTableEnvironment extends TableEnvironment {
 	<T> DataSet<T> toDataSet(Table table, TypeInformation<T> typeInfo, BatchQueryConfig queryConfig);
 
 	/**
+	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+	 * NOTE: Currently only SQL INSERT statements are supported.
+	 *
+	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
+	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+	 * called, for example when it is embedded into a String.
+	 * Hence, SQL queries can directly reference a {@link Table} as follows:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // register the configured table sink into which the result is inserted.
+	 *   tEnv.registerTableSink("sinkTable", configuredSink);
+	 *   Table sourceTable = ...
+	 *   String tableName = sourceTable.toString();
+	 *   // sourceTable is not registered to the table environment
+	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+	 * }
+	 * </pre>
+	 *
+	 * @param stmt The SQL statement to evaluate.
+	 * @param config The {@link BatchQueryConfig} to use.
+	 */
+	void sqlUpdate(String stmt, BatchQueryConfig config);
+
+	/**
+	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+	 *
+	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+	 *
+	 * @param table The Table to write to the sink.
+	 * @param queryConfig The {@link BatchQueryConfig} to use.
+	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
+	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+	 *        {@link Table} is written.
+	 */
+	void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+	/**
 	 * Creates a table source and/or table sink from a descriptor.
 	 *
 	 * <p>Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index 6859780..a18f3d4 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.api.java;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -34,6 +36,7 @@ import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
 
 /**
  * This table environment is the entry point and central context for creating Table & SQL
@@ -448,4 +451,62 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 */
 	@Override
 	StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+
+	/**
+	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+	 * NOTE: Currently only SQL INSERT statements are supported.
+	 *
+	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
+	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+	 * called, for example when it is embedded into a String.
+	 * Hence, SQL queries can directly reference a {@link Table} as follows:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // register the configured table sink into which the result is inserted.
+	 *   tEnv.registerTableSink("sinkTable", configuredSink);
+	 *   Table sourceTable = ...
+	 *   String tableName = sourceTable.toString();
+	 *   // sourceTable is not registered to the table environment
+	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+	 * }
+	 * </pre>
+	 *
+	 * @param stmt The SQL statement to evaluate.
+	 * @param config The {@link QueryConfig} to use.
+	 */
+	void sqlUpdate(String stmt, StreamQueryConfig config);
+
+	/**
+	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+	 *
+	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+	 *
+	 * @param table The Table to write to the sink.
+	 * @param queryConfig The {@link StreamQueryConfig} to use.
+	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
+	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+	 *        {@link Table} is written.
+	 */
+	void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program.
+	 *
+	 * <p>The program execution will be logged and displayed with the provided name
+	 *
+	 * <p>It calls the {@link StreamExecutionEnvironment#execute(String)} on the underlying
+	 * {@link StreamExecutionEnvironment}. In contrast to the {@link TableEnvironment} this
+	 * environment translates queries eagerly. Therefore the values in {@link QueryConfig}
+	 * parameter are ignored.
+	 *
+	 * @param jobName Desired name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	@Override
+	JobExecutionResult execute(String jobName) throws Exception;
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 05815dd..4dde3d6 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -91,14 +92,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		this.executionEnvironment = executionEnvironment;
 	}
 
-	/**
-	 * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for
-	 * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting
-	 * from/to {@link DataStream}.
-	 *
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
 	public static StreamTableEnvironment create(
 			StreamExecutionEnvironment executionEnvironment,
 			EnvironmentSettings settings,
@@ -241,7 +234,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			table.getQueryOperation(),
 			TypeConversions.fromLegacyInfoToDataType(typeInfo),
 			OutputConversionModifyOperation.UpdateMode.APPEND);
-		queryConfigProvider.setConfig(queryConfig);
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
 		return toDataStream(table, modifyOperation);
 	}
 
@@ -273,7 +268,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			table.getQueryOperation(),
 			wrapWithChangeFlag(typeInfo),
 			OutputConversionModifyOperation.UpdateMode.RETRACT);
-		queryConfigProvider.setConfig(queryConfig);
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
 		return toDataStream(table, modifyOperation);
 	}
 
@@ -282,6 +279,22 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		return (StreamTableDescriptor) super.connect(connectorDescriptor);
 	}
 
+	@Override
+	public void sqlUpdate(String stmt, StreamQueryConfig config) {
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(config.getMinIdleStateRetentionTime()),
+			Time.milliseconds(config.getMaxIdleStateRetentionTime()));
+		sqlUpdate(stmt);
+	}
+
+	@Override
+	public void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued) {
+		tableConfig.setIdleStateRetentionTime(
+			Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+			Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
+		insertInto(table, sinkPath, sinkPathContinued);
+	}
+
 	/**
 	 * This is a temporary workaround for Python API. Python API should not use StreamExecutionEnvironment at all.
 	 */
@@ -305,6 +318,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
 	}
 
+	@Override
+	protected boolean isEagerOperationTranslation() {
+		return true;
+	}
+
 	private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
 		try {
 			return TypeExtractor.createTypeInfo(clazz);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
index a7cac52..c70d948 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
@@ -22,7 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * The {@link BatchQueryConfig} holds parameters to configure the behavior of batch queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
+@Deprecated
 @PublicEvolving
 public class BatchQueryConfig implements QueryConfig {
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
index 99cdb41..d330b45 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
@@ -24,8 +24,11 @@ import java.io.Serializable;
 
 /**
  * The {@link QueryConfig} holds parameters to configure the behavior of queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
 @PublicEvolving
+@Deprecated
 public interface QueryConfig extends Serializable {
 
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
index 86c0c75..8c774ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.table.api;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 
 /**
  * The {@link StreamQueryConfig} holds parameters to configure the behavior of streaming queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
  */
+@Deprecated
 @PublicEvolving
 public class StreamQueryConfig implements QueryConfig {
 
@@ -39,6 +43,15 @@ public class StreamQueryConfig implements QueryConfig {
 	 */
 	private long maxIdleStateRetentionTime = 0L;
 
+	@Internal
+	public StreamQueryConfig(long minIdleStateRetentionTime, long maxIdleStateRetentionTime) {
+		this.minIdleStateRetentionTime = minIdleStateRetentionTime;
+		this.maxIdleStateRetentionTime = maxIdleStateRetentionTime;
+	}
+
+	public StreamQueryConfig() {
+	}
+
 	/**
 	 * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
 	 * was not updated, will be retained.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 325732f..8bdc461 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.table.api.internal.CompositePlannerConfig;
 import org.apache.flink.util.Preconditions;
 
@@ -59,6 +60,18 @@ public class TableConfig {
 	private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
 	/**
+	 * The minimum time until state which was not updated will be retained.
+	 * State might be cleared and removed if it was not updated for the defined period of time.
+	 */
+	private long minIdleStateRetentionTime = 0L;
+
+	/**
+	 * The maximum time until state which was not updated will be retained.
+	 * State will be cleared and removed if it was not updated for the defined period of time.
+	 */
+	private long maxIdleStateRetentionTime = 0L;
+
+	/**
 	 * Returns the timezone for date/time/timestamp conversions.
 	 */
 	public TimeZone getTimeZone() {
@@ -135,6 +148,51 @@ public class TableConfig {
 		this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
 	}
 
+	/**
+	 * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the minimum time and will never
+	 * be kept if it was idle for more than the maximum time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.
+	 *
+	 * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+	 *                never clean-up the state.
+	 * @param maxTime The maximum time interval for which idle state is retained. Must be at least
+	 *                5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
+	 */
+	public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
+		if (maxTime.toMilliseconds() - minTime.toMilliseconds() < 300000 &&
+			!(maxTime.toMilliseconds() == 0 && minTime.toMilliseconds() == 0)) {
+			throw new IllegalArgumentException(
+				"Difference between minTime: " + minTime.toString() + " and maxTime: " + maxTime.toString() +
+					"shoud be at least 5 minutes.");
+		}
+		minIdleStateRetentionTime = minTime.toMilliseconds();
+		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+	}
+
+	/**
+	 * @return The minimum time until state which was not updated will be retained.
+	 */
+	public long getMinIdleStateRetentionTime() {
+		return minIdleStateRetentionTime;
+	}
+
+	/**
+	 * @return The maximum time until state which was not updated will be retained.
+	 */
+	public long getMaxIdleStateRetentionTime() {
+		return maxIdleStateRetentionTime;
+	}
+
 	public static TableConfig getDefault() {
 		return new TableConfig();
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 85d20f6..686e816 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ExternalCatalog;
@@ -188,21 +189,6 @@ public interface TableEnvironment {
 	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
 	 *
 	 * @param table The Table to write to the sink.
-	 * @param queryConfig The {@link QueryConfig} to use.
-	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
-	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
-	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
-	 *        {@link Table} is written.
-	 */
-	void insertInto(Table table, QueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
-
-	/**
-	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
-	 *
-	 * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
-	 * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
-	 *
-	 * @param table The Table to write to the sink.
 	 * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
 	 *        written. This is to ensure at least the name of the {@link TableSink} is provided.
 	 * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
@@ -336,31 +322,6 @@ public interface TableEnvironment {
 	void sqlUpdate(String stmt);
 
 	/**
-	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
-	 * NOTE: Currently only SQL INSERT statements are supported.
-	 *
-	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
-	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
-	 * called, for example when it is embedded into a String.
-	 * Hence, SQL queries can directly reference a {@link Table} as follows:
-	 *
-	 * <pre>
-	 * {@code
-	 *   // register the configured table sink into which the result is inserted.
-	 *   tEnv.registerTableSink("sinkTable", configuredSink);
-	 *   Table sourceTable = ...
-	 *   String tableName = sourceTable.toString();
-	 *   // sourceTable is not registered to the table environment
-	 *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
-	 * }
-	 * </pre>
-	 *
-	 * @param stmt The SQL statement to evaluate.
-	 * @param config The {@link QueryConfig} to use.
-	 */
-	void sqlUpdate(String stmt, QueryConfig config);
-
-	/**
 	 * Gets the current default catalog name of the current session.
 	 *
 	 * @return The current default catalog name that is used for the path resolution.
@@ -496,4 +457,24 @@ public interface TableEnvironment {
 	 * Returns the table config that defines the runtime behavior of the Table API.
 	 */
 	TableConfig getConfig();
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program.
+	 *
+	 * <p>The program execution will be logged and displayed with the provided name
+	 *
+	 * <p><b>NOTE:</b>It is highly advised to set all parameters in the {@link TableConfig}
+	 * on the very beginning of the program. It is undefined what configurations values will
+	 * be used for the execution if queries are mixed with config changes. It depends on
+	 * the characteristic of the particular parameter. For some of them the value from the
+	 * point in time of query construction (e.g. the currentCatalog) will be used. On the
+	 * other hand some values might be evaluated according to the state from the time when
+	 * this method is called (e.g. timeZone).
+	 *
+	 * @param jobName Desired name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	JobExecutionResult execute(String jobName) throws Exception;
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
deleted file mode 100644
index 171354f..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.internal;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.PlannerConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
-import org.apache.flink.table.delegation.Planner;
-
-/**
- * An adapter to {@link PlannerConfig} that enables to pass {@link org.apache.flink.table.api.QueryConfig}
- * to {@link Planner} via {@link org.apache.flink.table.api.TableConfig}.
- */
-@Internal
-public class QueryConfigProvider implements PlannerConfig {
-	private StreamQueryConfig config;
-
-	public StreamQueryConfig getConfig() {
-		return config;
-	}
-
-	public void setConfig(StreamQueryConfig config) {
-		this.config = config;
-	}
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index e94c65a..b2c786a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -20,11 +20,10 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.QueryConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -76,12 +75,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private final String defaultCatalogName;
 	private final String defaultDatabaseName;
-	private final TableConfig tableConfig;
 	private final OperationTreeBuilder operationTreeBuilder;
+	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
 
+	protected final TableConfig tableConfig;
 	protected final Executor execEnv;
 	protected final FunctionCatalog functionCatalog;
-	protected final QueryConfigProvider queryConfigProvider = new QueryConfigProvider();
 	protected final Planner planner;
 
 	protected TableEnvironmentImpl(
@@ -95,7 +94,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		this.execEnv = executor;
 
 		this.tableConfig = tableConfig;
-		this.tableConfig.addPlannerConfig(queryConfigProvider);
 		this.defaultCatalogName = catalogManager.getCurrentCatalog();
 		this.defaultDatabaseName = catalogManager.getCurrentDatabase();
 
@@ -266,31 +264,24 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
-	public void sqlUpdate(String stmt) {
-		sqlUpdate(stmt, new StreamQueryConfig());
-	}
-
-	@Override
-	public void insertInto(Table table, QueryConfig queryConfig, String path, String... pathContinued) {
+	public void insertInto(Table table, String path, String... pathContinued) {
 		List<String> fullPath = new ArrayList<>(Arrays.asList(pathContinued));
 		fullPath.add(0, path);
 
-		queryConfigProvider.setConfig((StreamQueryConfig) queryConfig);
-		List<Transformation<?>> translate = planner.translate(Collections.singletonList(
+		List<ModifyOperation> modifyOperations = Collections.singletonList(
 			new CatalogSinkModifyOperation(
 				fullPath,
-				table.getQueryOperation())));
-
-		execEnv.apply(translate);
-	}
+				table.getQueryOperation()));
 
-	@Override
-	public void insertInto(Table table, String path, String... pathContinued) {
-		insertInto(table, new StreamQueryConfig(), path, pathContinued);
+		if (isEagerOperationTranslation()) {
+			translate(modifyOperations);
+		} else {
+			buffer(modifyOperations);
+		}
 	}
 
 	@Override
-	public void sqlUpdate(String stmt, QueryConfig config) {
+	public void sqlUpdate(String stmt) {
 		List<Operation> operations = planner.parse(stmt);
 
 		if (operations.size() != 1) {
@@ -301,11 +292,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		Operation operation = operations.get(0);
 
 		if (operation instanceof ModifyOperation) {
-			queryConfigProvider.setConfig((StreamQueryConfig) config);
-			List<Transformation<?>> transformations =
-				planner.translate(Collections.singletonList((ModifyOperation) operation));
-
-			execEnv.apply(transformations);
+			List<ModifyOperation> modifyOperations = Collections.singletonList((ModifyOperation) operation);
+			if (isEagerOperationTranslation()) {
+				translate(modifyOperations);
+			} else {
+				buffer(modifyOperations);
+			}
 		} else {
 			throw new TableException(
 				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
@@ -337,6 +329,48 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		return tableConfig;
 	}
 
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		translate(bufferedModifyOperations);
+		bufferedModifyOperations.clear();
+		return execEnv.execute(jobName);
+	}
+
+	/**
+	 * Defines the behavior of this {@link TableEnvironment}. If true the queries will
+	 * be translated immediately. If false the {@link ModifyOperation}s will be buffered
+	 * and translated only when {@link #execute(String)} is called.
+	 *
+	 * <p>If the {@link TableEnvironment} works in a lazy manner it is undefined what
+	 * configurations values will be used. It depends on the characteristic of the particular
+	 * parameter. Some might used values current to the time of query construction (e.g. the currentCatalog)
+	 * and some use values from the time when {@link #execute(String)} is called (e.g. timeZone).
+	 *
+	 * @return true if the queries should be translated immediately.
+	 */
+	protected boolean isEagerOperationTranslation() {
+		return false;
+	}
+
+	/**
+	 * Subclasses can override this method to add additional checks.
+	 *
+	 * @param tableSource tableSource to validate
+	 */
+	protected void validateTableSource(TableSource<?> tableSource) {
+		TableSourceValidation.validateTableSource(tableSource);
+	}
+
+	private void translate(List<ModifyOperation> modifyOperations) {
+		List<Transformation<?>> transformations = planner.translate(modifyOperations);
+
+		execEnv.apply(transformations);
+	}
+
+	private void buffer(List<ModifyOperation> modifyOperations) {
+		bufferedModifyOperations.addAll(modifyOperations);
+	}
+
 	protected void registerTableInternal(String name, CatalogBaseTable table) {
 		try {
 			checkValidTableName(name);
@@ -374,15 +408,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		}
 	}
 
-	/**
-	 * Subclasses can override this method to add additional checks.
-	 *
-	 * @param tableSource tableSource to validate
-	 */
-	protected void validateTableSource(TableSource<?> tableSource) {
-		TableSourceValidation.validateTableSource(tableSource);
-	}
-
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
 		Optional<CatalogBaseTable> table = getCatalogTable(defaultCatalogName, defaultDatabaseName, name);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index ae08d24..41e483b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.table.api.AggregatedTable;
 import org.apache.flink.table.api.FlatAggregateTable;
 import org.apache.flink.table.api.GroupWindow;
@@ -27,6 +28,7 @@ import org.apache.flink.table.api.GroupedTable;
 import org.apache.flink.table.api.OverWindow;
 import org.apache.flink.table.api.OverWindowedTable;
 import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
@@ -416,7 +418,14 @@ public class TableImpl implements Table {
 
 	@Override
 	public void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued) {
-		tableEnvironment.insertInto(this, conf, tablePath, tablePathContinued);
+		if (conf instanceof StreamQueryConfig) {
+			StreamQueryConfig streamQueryConfig = (StreamQueryConfig) conf;
+			tableEnvironment.getConfig().setIdleStateRetentionTime(
+				Time.milliseconds(streamQueryConfig.getMinIdleStateRetentionTime()),
+				Time.milliseconds(streamQueryConfig.getMaxIdleStateRetentionTime())
+			);
+		}
+		tableEnvironment.insertInto(this, tablePath, tablePathContinued);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index fa6b178..6b3ecbc 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.api.{TableEnvironment, _}
@@ -24,6 +25,7 @@ import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
 
 /**
   * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works
@@ -155,6 +157,66 @@ trait BatchTableEnvironment extends TableEnvironment {
     queryConfig: BatchQueryConfig): DataSet[T]
 
   /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+    * called, for example when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the configured table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", configuredSink);
+    *   Table sourceTable = ...
+    *   String tableName = sourceTable.toString();
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+    * }}}
+    *
+    * @param stmt   The SQL statement to evaluate.
+    * @param config The [[BatchQueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * See the documentation of TableEnvironment#useDatabase or
+    * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+    *
+    * @param table             The Table to write to the sink.
+    * @param queryConfig       The [[BatchQueryConfig]] to use.
+    * @param sinkPath          The first part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written. This is to ensure at least the
+    *                          name of the [[TableSink]] is provided.
+    * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written.
+    */
+  def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit
+
+  /**
+    * Triggers the program execution. The environment will execute all parts of
+    * the program.
+    *
+    * The program execution will be logged and displayed with the provided name
+    *
+    * It calls the ExecutionEnvironment#execute on the underlying
+    * [[ExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+    * environment translates queries eagerly.
+    *
+    * @param jobName Desired name of the job
+    * @return The result of the job execution, containing elapsed time and accumulators.
+    * @throws Exception which occurs during job execution.
+    */
+  @throws[Exception]
+  override def execute(jobName: String): JobExecutionResult
+
+  /**
     * Creates a table source and/or table sink from a descriptor.
     *
     * Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 3baa5fa..db88251 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.api.scala
 
 import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.{TableEnvironment, _}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
 
 /**
   * This table environment is the entry point and central context for creating Table & SQL
@@ -209,6 +211,67 @@ trait StreamTableEnvironment extends TableEnvironment {
     table: Table,
     queryConfig: StreamQueryConfig): DataStream[(Boolean, T)]
 
+
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+    * called, for example when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the configured table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", configuredSink);
+    *   Table sourceTable = ...
+    *   String tableName = sourceTable.toString();
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+    * }}}
+    *
+    * @param stmt   The SQL statement to evaluate.
+    * @param config The [[QueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * See the documentation of TableEnvironment#useDatabase or
+    * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+    *
+    * @param table             The Table to write to the sink.
+    * @param queryConfig       The [[StreamQueryConfig]] to use.
+    * @param sinkPath          The first part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written. This is to ensure at least the name
+    *                          of the [[TableSink]] is provided.
+    * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+    *                          which the [[Table]] is written.
+    */
+  def insertInto(
+    table: Table,
+    queryConfig: StreamQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit
+
+  /**
+    * Triggers the program execution. The environment will execute all parts of
+    * the program.
+    *
+    * The program execution will be logged and displayed with the provided name
+    *
+    * It calls the StreamExecutionEnvironment#execute on the underlying
+    * [[StreamExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+    * environment translates queries eagerly.
+    *
+    * @param jobName Desired name of the job
+    * @return The result of the job execution, containing elapsed time and accumulators.
+    * @throws Exception which occurs during job execution.
+    */
+  @throws[Exception]
+  override def execute(jobName: String): JobExecutionResult
+
   /**
     * Creates a table source and/or table sink from a descriptor.
     *
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 990f043..fedb317 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.api.scala.internal
 
 import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.scala._
@@ -102,7 +103,9 @@ class StreamTableEnvironmentImpl (
       table.getQueryOperation,
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.APPEND)
-    queryConfigProvider.setConfig(queryConfig)
+    tableConfig.setIdleStateRetentionTime(
+      Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+      Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
     toDataStream(table, modifyOperation)
   }
 
@@ -121,7 +124,9 @@ class StreamTableEnvironmentImpl (
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.RETRACT)
 
-    queryConfigProvider.setConfig(queryConfig)
+    tableConfig.setIdleStateRetentionTime(
+        Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+        Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
     toDataStream(table, modifyOperation)
   }
 
@@ -182,6 +187,8 @@ class StreamTableEnvironmentImpl (
     }
   }
 
+  override protected def isEagerOperationTranslation(): Boolean = true
+
   private def toDataStream[T](
       table: Table,
       modifyOperation: OutputConversionModifyOperation)
@@ -233,6 +240,26 @@ class StreamTableEnvironmentImpl (
       typeInfoSchema.getIndices,
       typeInfoSchema.toTableSchema)
   }
+
+  override def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit = {
+    tableConfig
+      .setIdleStateRetentionTime(
+        Time.milliseconds(config.getMinIdleStateRetentionTime),
+        Time.milliseconds(config.getMaxIdleStateRetentionTime))
+    sqlUpdate(stmt)
+  }
+
+  override def insertInto(
+      table: Table,
+      queryConfig: StreamQueryConfig,
+      sinkPath: String,
+      sinkPathContinued: String*): Unit = {
+    tableConfig
+      .setIdleStateRetentionTime(
+        Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+        Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
+    insertInto(table, sinkPath, sinkPathContinued: _*)
+  }
 }
 
 object StreamTableEnvironmentImpl {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b8425d7..eb0c543 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -65,8 +66,6 @@ abstract class BatchTableEnvImpl(
     planningConfigurationBuilder
   )
 
-  override def queryConfig: BatchQueryConfig = new BatchQueryConfig
-
   /**
     * Registers an internal [[BatchTableSource]] in this [[TableEnvImpl]]'s catalog without
     * name checking. Registered tables can be referenced in SQL queries.
@@ -103,34 +102,25 @@ abstract class BatchTableEnvImpl(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param queryConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
     */
   override private[flink] def writeToSink[T](
       table: Table,
-      sink: TableSink[T],
-      queryConfig: QueryConfig): Unit = {
-
-    // Check the query configuration to be a batch one.
-    val batchQueryConfig = queryConfig match {
-      case batchConfig: BatchQueryConfig => batchConfig
-      case _ =>
-        throw new TableException("BatchQueryConfig required to configure batch query.")
-    }
+      sink: TableSink[T]): Unit = {
 
     sink match {
       case batchSink: BatchTableSink[T] =>
         val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
           .asInstanceOf[TypeInformation[T]]
         // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+        val result: DataSet[T] = translate(table)(outputType)
         // Give the DataSet to the TableSink to emit it.
         batchSink.emitDataSet(result)
       case boundedSink: OutputFormatTableSink[T] =>
         val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
           .asInstanceOf[TypeInformation[T]]
         // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+        val result: DataSet[T] = translate(table)(outputType)
         // use the OutputFormat to consume the DataSet.
         val dataSink = result.output(boundedSink.getOutputFormat)
         dataSink.name(
@@ -188,8 +178,7 @@ abstract class BatchTableEnvImpl(
     val optimizedPlan = optimizer.optimize(ast)
     val dataSet = translate[Row](
       optimizedPlan,
-      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan),
-      queryConfig)(
+      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
       new GenericTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
@@ -211,6 +200,8 @@ abstract class BatchTableEnvImpl(
 
   def explain(table: Table): String = explain(table: Table, extended = false)
 
+  override def execute(jobName: String): JobExecutionResult = execEnv.execute(jobName)
+
   protected def asQueryOperation[T](dataSet: DataSet[T], fields: Option[Array[Expression]])
     : DataSetQueryOperation[T] = {
     val inputType = dataSet.getType
@@ -253,21 +244,17 @@ abstract class BatchTableEnvImpl(
     * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
     *
     * @param table The root node of the relational expression tree.
-    * @param queryConfig The configuration for the query to generate.
     * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](
-      table: Table,
-      queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     val queryOperation = table.getQueryOperation
     val relNode = getRelBuilder.tableOperation(queryOperation).build()
     val dataSetPlan = optimizer.optimize(relNode)
     translate(
       dataSetPlan,
-      getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan),
-      queryConfig)
+      getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan))
   }
 
   /**
@@ -276,20 +263,18 @@ abstract class BatchTableEnvImpl(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can lose the
     *                    field naming during optimization we pass the row type separately.
-    * @param queryConfig The configuration for the query to generate.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](
       logicalPlan: RelNode,
-      logicalType: TableSchema,
-      queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+      logicalType: TableSchema)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     validateInputTypeInfo(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
-        val plan = node.translateToPlan(this, queryConfig)
+        val plan = node.translateToPlan(this, new BatchQueryConfig)
         val conversion =
           getConversionMapper(
             plan.getType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2d37173..24e52d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -94,11 +94,6 @@ abstract class TableEnvImpl(
     case _ => false
   }
 
-  private[flink] def queryConfig: QueryConfig = this match {
-    case _: BatchTableEnvImpl => new BatchQueryConfig
-    case _ => null
-  }
-
   override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
     catalogManager.registerExternalCatalog(name, externalCatalog)
   }
@@ -403,10 +398,6 @@ abstract class TableEnvImpl(
   }
 
   override def sqlUpdate(stmt: String): Unit = {
-    sqlUpdate(stmt, this.queryConfig)
-  }
-
-  override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
     val planner = getFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(stmt)
@@ -424,7 +415,7 @@ abstract class TableEnvImpl(
         val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
 
         // insert query result into sink table
-        insertInto(queryResult, config, targetTablePath.asScala:_*)
+        insertInto(queryResult, targetTablePath.asScala:_*)
       case _ =>
         throw new TableException(
           "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
@@ -444,25 +435,15 @@ abstract class TableEnvImpl(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @param conf The [[QueryConfig]] to use.
     * @tparam T The data type that the [[TableSink]] expects.
     */
-  private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
-
-
-  override def insertInto(
-    table: Table,
-    queryConfig: QueryConfig,
-    path: String,
-    pathContinued: String*): Unit = {
-    insertInto(table, queryConfig, path +: pathContinued: _*)
-  }
+  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
 
   override def insertInto(
     table: Table,
     path: String,
     pathContinued: String*): Unit = {
-    insertInto(table, queryConfig, path, pathContinued: _*)
+    insertInto(table, path +: pathContinued: _*)
   }
 
   /**
@@ -470,9 +451,8 @@ abstract class TableEnvImpl(
     *
     * @param table The table to write to the TableSink.
     * @param sinkTablePath The name of the registered TableSink.
-    * @param conf The query configuration to use.
     */
-  private def insertInto(table: Table, conf: QueryConfig, sinkTablePath: String*): Unit = {
+  private def insertInto(table: Table, sinkTablePath: String*): Unit = {
 
     // check that sink table exists
     if (null == sinkTablePath) {
@@ -491,7 +471,7 @@ abstract class TableEnvImpl(
         // validate schema of source table and table sink
         TableSinkUtils.validateSink(table.getQueryOperation, sinkTablePath.asJava, tableSink)
         // emit the table to the configured table sink
-        writeToSink(table, tableSink, conf)
+        writeToSink(table, tableSink)
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
index 5ef9917..7358721 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
@@ -68,26 +68,26 @@ class BatchTableEnvironmentImpl(
 
   override def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
     // Use the default query config.
-    translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
   }
 
   override def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
     // Use the default batch query config.
-    translate[T](table, queryConfig)(typeInfo)
+    translate[T](table)(typeInfo)
   }
 
   override def toDataSet[T](
       table: Table,
       clazz: Class[T],
       queryConfig: BatchQueryConfig): DataSet[T] = {
-    translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
   }
 
   override def toDataSet[T](
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: BatchQueryConfig): DataSet[T] = {
-    translate[T](table, queryConfig)(typeInfo)
+    translate[T](table)(typeInfo)
   }
 
   override def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
@@ -112,4 +112,14 @@ class BatchTableEnvironmentImpl(
 
     registerAggregateFunctionInternal[T, ACC](name, f)
   }
+
+  override def sqlUpdate(
+    stmt: String,
+    config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+  override def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
index 6c1d155..8e13426 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
@@ -63,12 +63,12 @@ class BatchTableEnvironmentImpl(
 
   override def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
     // Use the default batch query config.
-    wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
   override def toDataSet[T: TypeInformation](
     table: Table, queryConfig: BatchQueryConfig): DataSet[T] = {
-    wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
   override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
@@ -81,5 +81,13 @@ class BatchTableEnvironmentImpl(
   : Unit = {
     registerAggregateFunctionInternal[T, ACC](name, f)
   }
+
+  override def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+  override def insertInto(
+    table: Table,
+    queryConfig: BatchQueryConfig,
+    sinkPath: String,
+    sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 92465d8..8cd11fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -17,23 +17,23 @@
  */
 
 package org.apache.flink.table.planner
-import _root_.java.lang.{Boolean => JBool}
-import _root_.java.util.{Objects, List => JList}
-import java.util
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
+
+import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util.{Objects, List => JList}
+import java.util
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.QueryConfigProvider
 import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
 import org.apache.flink.table.delegation.{Executor, Planner}
@@ -187,7 +187,10 @@ class StreamPlanner(
   }
 
   private def unwrapQueryConfig = {
-    config.getPlannerConfig.unwrap(classOf[QueryConfigProvider]).get().getConfig
+    new StreamQueryConfig(
+      config.getMinIdleStateRetentionTime,
+      config.getMaxIdleStateRetentionTime
+    )
   }
 
   private def explain(tableOperation: QueryOperation, queryConfig: StreamQueryConfig) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index 55c3c31..faccc9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -72,6 +72,6 @@ class InsertIntoValidationTest extends TableTestBase {
     val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
 
     // must fail because partial insert is not supported yet.
-    util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+    util.tableEnv.sqlUpdate(sql)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
index 86458b3..6d382f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
@@ -19,19 +19,20 @@ package org.apache.flink.table.runtime.harness
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
-
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.harness.HarnessTestBase._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{MultiArgCount, MultiArgSum}
 import org.apache.flink.types.Row
+
 import org.junit.Assert.assertTrue
 import org.junit.Test
 
@@ -39,8 +40,13 @@ import scala.collection.mutable
 
 class GroupAggregateHarnessTest extends HarnessTestBase {
 
-  protected var queryConfig =
+  private val queryConfig =
     new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+  private val tableConfig = new TableConfig {
+    override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+    override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+  }
 
   @Test
   def testAggregate(): Unit = {
@@ -185,7 +191,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testDistinctAggregateWithRetract(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val data = new mutable.MutableList[(JLong, JInt)]
     val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -202,7 +208,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
          |""".stripMargin)
 
     val testHarness = createHarnessTester[String, CRow, CRow](
-      sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+      sqlQuery.toRetractStream[Row], "groupBy")
 
     testHarness.setStateBackend(getStateBackend)
     testHarness.open()
@@ -273,7 +279,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testDistinctAggregateWithDifferentArgumentOrder(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val data = new mutable.MutableList[(JLong, JLong, JLong)]
     val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
@@ -292,7 +298,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
          |""".stripMargin)
 
     val testHarness = createHarnessTester[String, CRow, CRow](
-      sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+      sqlQuery.toRetractStream[Row], "groupBy")
 
     testHarness.setStateBackend(getStateBackend)
     testHarness.open()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
index 56d79d5..9519c3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
@@ -17,33 +17,37 @@
  */
 package org.apache.flink.table.runtime.harness
 
-import java.lang.{Integer => JInt}
-import java.util.concurrent.ConcurrentLinkedQueue
-
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.harness.HarnessTestBase._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.utils.{Top3WithEmitRetractValue, Top3WithMapView}
 import org.apache.flink.types.Row
+
 import org.junit.Test
 
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.collection.mutable
 
 class TableAggregateHarnessTest extends HarnessTestBase {
 
-  protected var queryConfig =
-    new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+  private val tableConfig = new TableConfig {
+    override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+    override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+  }
   val data = new mutable.MutableList[(Int, Int)]
 
   @Test
   def testTableAggregate(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithMapView
     tEnv.registerFunction("top3", top3)
@@ -54,7 +58,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('a, 'b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+      resultTable.toRetractStream[Row], "groupBy: (a)")
 
     testHarness.open()
 
@@ -107,7 +111,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
   def testTableAggregateEmitRetractValueIncrementally(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithEmitRetractValue
     val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -117,7 +121,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('a, 'b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+      resultTable.toRetractStream[Row], "groupBy: (a)")
 
     testHarness.open()
 
@@ -160,7 +164,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
   def testTableAggregateWithRetractInput(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, tableConfig)
 
     val top3 = new Top3WithMapView
     tEnv.registerFunction("top3", top3)
@@ -172,7 +176,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
       .select('b1, 'b2)
 
     val testHarness = createHarnessTester[Int, CRow, CRow](
-      resultTable.toRetractStream[Row](queryConfig), "select: (Top3WithMapView")
+      resultTable.toRetractStream[Row], "select: (Top3WithMapView")
 
     testHarness.open()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 09bbe77..8f138de 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.utils
 
 import java.util.Optional
 
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.catalog.{Catalog, ExternalCatalog}
@@ -69,8 +70,6 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def sqlUpdate(stmt: String): Unit = ???
 
-  override def sqlUpdate(stmt: String, config: QueryConfig): Unit = ???
-
   override def getConfig: TableConfig = ???
 
   override def registerCatalog(
@@ -89,12 +88,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def insertInto(
     table: Table,
-    queryConfig: QueryConfig,
     sinkPath: String,
     sinkPathContinued: String*): Unit = ???
 
-  override def insertInto(
-    table: Table,
-    sinkPath: String,
-    sinkPathContinued: String*): Unit = ???
+  override def execute(jobName: String): JobExecutionResult = ???
 }


[flink] 03/03: [FLINK-13088][table-api] Support instantiating the unfied TableEnvironment

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 593ff73d903f4e8147f11601e85abefef27fb62e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 4 13:08:02 2019 +0200

    [FLINK-13088][table-api] Support instantiating the unfied TableEnvironment
    
    This closes #8984
---
 .../java/internal/StreamTableEnvironmentImpl.java  |  6 +++-
 .../apache/flink/table/api/TableEnvironment.java   | 26 +++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   | 34 ++++++++++++++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      | 10 +++++--
 4 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 4dde3d6..b24c087 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -96,17 +96,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			StreamExecutionEnvironment executionEnvironment,
 			EnvironmentSettings settings,
 			TableConfig tableConfig) {
+
 		FunctionCatalog functionCatalog = new FunctionCatalog(
 			settings.getBuiltInCatalogName(),
 			settings.getBuiltInDatabaseName());
 		CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
-		Map<String, String> plannerProperties = settings.toPlannerProperties();
+
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
+
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
 		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
 			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
 			functionCatalog,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 686e816..64ae899 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
@@ -54,6 +55,31 @@ import java.util.Optional;
 public interface TableEnvironment {
 
 	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs.
+	 *
+	 * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction
+	 * between Scala and Java API) and for bounded and unbounded data processing.
+	 *
+	 * <p>A table environment is responsible for:
+	 * <ul>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to
+	 * other Flink APIs, it might be necessary to use one of the available language-specific table environments
+	 * in the corresponding bridging modules.
+	 *
+	 * @param settings The environment settings used to instantiate the {@link TableEnvironment}.
+	 */
+	static TableEnvironment create(EnvironmentSettings settings) {
+		return TableEnvironmentImpl.create(settings);
+	}
+
+	/**
 	 * Creates a table from a table source.
 	 *
 	 * @param source table source used as table
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index b2c786a..a7c9377 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -35,15 +36,19 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -61,6 +66,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -109,6 +115,34 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		);
 	}
 
+	public static TableEnvironmentImpl create(EnvironmentSettings settings) {
+
+		FunctionCatalog functionCatalog = new FunctionCatalog(
+			settings.getBuiltInCatalogName(),
+			settings.getBuiltInDatabaseName());
+		CatalogManager catalogManager = new CatalogManager(
+			settings.getBuiltInCatalogName(),
+			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+
+		Map<String, String> executorProperties = settings.toExecutorProperties();
+		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
+			.create(executorProperties);
+
+		TableConfig tableConfig = new TableConfig();
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
+		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
+		return new TableEnvironmentImpl(
+			catalogManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			!settings.isBatchMode()
+		);
+	}
+
 	@VisibleForTesting
 	public Planner getPlanner() {
 		return planner;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index fedb317..3a9b9fe 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -269,15 +269,18 @@ object StreamTableEnvironmentImpl {
       settings: EnvironmentSettings,
       tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
-    val executorProperties = settings.toExecutorProperties
-    val plannerProperties = settings.toPlannerProperties
-    val executor = lookupExecutor(executorProperties, executionEnvironment)
+
     val functionCatalog = new FunctionCatalog(
       settings.getBuiltInCatalogName,
       settings.getBuiltInDatabaseName)
     val catalogManager = new CatalogManager(
       settings.getBuiltInCatalogName,
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+
+    val executorProperties = settings.toExecutorProperties
+    val executor = lookupExecutor(executorProperties, executionEnvironment)
+
+    val plannerProperties = settings.toPlannerProperties
     val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
       .create(
         plannerProperties,
@@ -285,6 +288,7 @@ object StreamTableEnvironmentImpl {
         tableConfig,
         functionCatalog,
         catalogManager)
+
     new StreamTableEnvironmentImpl(
       catalogManager,
       functionCatalog,


[flink] 02/03: [hotfix][table-api] Remove CompositePlannerConfig

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 767d92489093acbf74fd29142cf3ad9eb39e2592
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 4 12:12:09 2019 +0200

    [hotfix][table-api] Remove CompositePlannerConfig
---
 .../org/apache/flink/table/api/TableConfig.java    |  7 ++--
 .../table/api/internal/CompositePlannerConfig.java | 45 ----------------------
 .../batch/table/JavaTableEnvironmentITCase.java    |  2 +-
 .../table/api/batch/table/CorrelateTest.scala      |  2 +-
 .../table/api/stream/table/CorrelateTest.scala     |  2 +-
 .../flink/table/plan/NormalizationRulesTest.scala  |  4 +-
 6 files changed, 8 insertions(+), 54 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 8bdc461..6ee7f8d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.table.api.internal.CompositePlannerConfig;
 import org.apache.flink.util.Preconditions;
 
 import java.math.MathContext;
@@ -45,7 +44,7 @@ public class TableConfig {
 	/**
 	 * Defines the configuration of Planner for Table API and SQL queries.
 	 */
-	private CompositePlannerConfig plannerConfig = new CompositePlannerConfig();
+	private PlannerConfig plannerConfig = PlannerConfig.EMPTY_CONFIG;
 
 	/**
 	 * Defines the default context for decimal division calculation.
@@ -110,8 +109,8 @@ public class TableConfig {
 	 * Sets the configuration of Planner for Table API and SQL queries.
 	 * Changing the configuration has no effect after the first query has been defined.
 	 */
-	public void addPlannerConfig(PlannerConfig plannerConfig) {
-		this.plannerConfig.addConfig(Preconditions.checkNotNull(plannerConfig));
+	public void setPlannerConfig(PlannerConfig plannerConfig) {
+		this.plannerConfig = Preconditions.checkNotNull(plannerConfig);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompositePlannerConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompositePlannerConfig.java
deleted file mode 100644
index 9f3a4ae..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompositePlannerConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.internal;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.PlannerConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A {@link PlannerConfig} to pass multiple different configs under a single object. It allows storing only a single
- * instance of a given class. If there is an object of the same class already, it is replaced with the new one.
- */
-@Internal
-public class CompositePlannerConfig implements PlannerConfig {
-	private final Map<Class<? extends PlannerConfig>, PlannerConfig> configs = new HashMap<>();
-
-	public void addConfig(PlannerConfig config) {
-		configs.put(config.getClass(), config);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public <T extends PlannerConfig> Optional<T> unwrap(Class<T> type) {
-		return (Optional<T>) Optional.ofNullable(configs.get(type));
-	}
-}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
index b05608a..6f35d96 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -562,7 +562,7 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
 				.replaceLogicalOptRuleSet(RuleSets.ofList())
 				.replacePhysicalOptRuleSet(RuleSets.ofList())
 				.build();
-		tableEnv.getConfig().addPlannerConfig(cc);
+		tableEnv.getConfig().setPlannerConfig(cc);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		Table t = tableEnv.fromDataSet(ds);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
index 1a71389..9ec49d1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
@@ -180,7 +180,7 @@ class CorrelateTest extends TableTestBase {
       .replaceLogicalOptRuleSet(RuleSets.ofList(logicalRuleSet.toList))
       .build()
 
-    util.tableEnv.getConfig.addPlannerConfig(cc)
+    util.tableEnv.getConfig.setPlannerConfig(cc)
 
     val sourceTable = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc0)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
index e62a2f9..7f62dee 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
@@ -284,7 +284,7 @@ class CorrelateTest extends TableTestBase {
       .replaceLogicalOptRuleSet(RuleSets.ofList(logicalRuleSet.toList))
       .build()
 
-    util.tableEnv.getConfig.addPlannerConfig(cc)
+    util.tableEnv.getConfig.setPlannerConfig(cc)
 
     val sourceTable = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc0)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
index f0a560c..910b719 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
@@ -40,7 +40,7 @@ class NormalizationRulesTest extends TableTestBase {
         .replaceLogicalOptRuleSet(RuleSets.ofList())
         .replacePhysicalOptRuleSet(RuleSets.ofList())
         .build()
-    util.tableEnv.getConfig.addPlannerConfig(cc)
+    util.tableEnv.getConfig.setPlannerConfig(cc)
 
     val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
@@ -76,7 +76,7 @@ class NormalizationRulesTest extends TableTestBase {
         .replaceLogicalOptRuleSet(RuleSets.ofList())
         .replacePhysicalOptRuleSet(RuleSets.ofList())
         .build()
-    util.tableEnv.getConfig.addPlannerConfig(cc)
+    util.tableEnv.getConfig.setPlannerConfig(cc)
 
     val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)