You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/16 13:52:43 UTC

[flink] branch master updated (ab7b8c4 -> 94ea5e6)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ab7b8c4  [FLINK-18311] Make StreamingKafkaITCase more resilient
     new 02cdfe8  [hotfix] Code cleanup: remove useless parameter from Environment#enrich method
     new 471a3df  [FLINK-18161][sql-client] Fix state retention config does not work in sql client
     new 94ea5e6  [FLINK-18161][sql-client] Fix configurations from flink-conf.yaml overwrite sql-client's properties

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/client/config/Environment.java     | 18 ++---
 .../client/gateway/local/ExecutionContext.java     | 79 ++++++++++++++++++----
 .../table/client/gateway/local/LocalExecutor.java  |  2 +-
 .../client/gateway/local/ExecutionContextTest.java | 63 ++++++++---------
 .../client/gateway/local/LocalExecutorITCase.java  |  4 +-
 .../test/resources/test-sql-client-defaults.yaml   |  4 +-
 6 files changed, 108 insertions(+), 62 deletions(-)


[flink] 03/03: [FLINK-18161][sql-client] Fix configurations from flink-conf.yaml overwrite sql-client's properties

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94ea5e6ca9a1a0c6dda6b832473e40578207e78b
Author: godfreyhe <go...@163.com>
AuthorDate: Sun Jun 14 10:51:05 2020 +0800

    [FLINK-18161][sql-client] Fix configurations from flink-conf.yaml overwrite sql-client's properties
    
    This closes #12643
---
 .../client/gateway/local/ExecutionContext.java     | 64 +++++++++++++++++-----
 .../client/gateway/local/ExecutionContextTest.java | 37 ++++++++-----
 .../client/gateway/local/LocalExecutorITCase.java  |  4 +-
 3 files changed, 76 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 6198440..0a444eb2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -19,6 +19,11 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies.FailureRateRestartStrategyConfiguration;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies.FallbackRestartStrategyConfiguration;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,8 +37,12 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -101,6 +110,7 @@ import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -524,15 +534,52 @@ public class ExecutionContext<ClusterID> {
 	private TableConfig createTableConfig() {
 		final TableConfig config = new TableConfig();
 		config.addConfiguration(flinkConfig);
-		environment.getConfiguration().asMap().forEach((k, v) ->
-				config.getConfiguration().setString(k, v));
+		Configuration conf = config.getConfiguration();
+		environment.getConfiguration().asMap().forEach(conf::setString);
 		ExecutionEntry execution = environment.getExecution();
 		config.setIdleStateRetentionTime(
 				Time.milliseconds(execution.getMinStateRetention()),
 				Time.milliseconds(execution.getMaxStateRetention()));
+
+		conf.set(CoreOptions.DEFAULT_PARALLELISM, execution.getParallelism());
+		conf.set(PipelineOptions.MAX_PARALLELISM, execution.getMaxParallelism());
+		conf.set(StreamPipelineOptions.TIME_CHARACTERISTIC, execution.getTimeCharacteristic());
+		if (execution.getTimeCharacteristic() == TimeCharacteristic.EventTime) {
+			conf.set(PipelineOptions.AUTO_WATERMARK_INTERVAL,
+					Duration.ofMillis(execution.getPeriodicWatermarksInterval()));
+		}
+
+		setRestartStrategy(conf);
 		return config;
 	}
 
+	private void setRestartStrategy(Configuration conf) {
+		RestartStrategyConfiguration restartStrategy = environment.getExecution().getRestartStrategy();
+		if (restartStrategy instanceof NoRestartStrategyConfiguration) {
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+		} else if (restartStrategy instanceof FixedDelayRestartStrategyConfiguration) {
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+			FixedDelayRestartStrategyConfiguration fixedDelay = ((FixedDelayRestartStrategyConfiguration) restartStrategy);
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+					fixedDelay.getRestartAttempts());
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+					Duration.ofMillis(fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds()));
+		} else if (restartStrategy instanceof FailureRateRestartStrategyConfiguration) {
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
+			FailureRateRestartStrategyConfiguration failureRate = (FailureRateRestartStrategyConfiguration) restartStrategy;
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
+					failureRate.getMaxFailureRate());
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
+					Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds()));
+			conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
+					Duration.ofMillis(failureRate.getDelayBetweenAttemptsInterval().toMilliseconds()));
+		} else if (restartStrategy instanceof FallbackRestartStrategyConfiguration) {
+			// default is FallbackRestartStrategyConfiguration
+			// see ExecutionConfig.restartStrategyConfiguration
+			conf.removeConfig(RestartStrategyOptions.RESTART_STRATEGY);
+		}
+	}
+
 	private void createTableEnvironment(
 			EnvironmentSettings settings,
 			TableConfig config,
@@ -555,7 +602,7 @@ public class ExecutionContext<ClusterID> {
 					functionCatalog);
 		} else if (environment.getExecution().isBatchPlanner()) {
 			streamExecEnv = null;
-			execEnv = createExecutionEnvironment();
+			execEnv = ExecutionEnvironment.getExecutionEnvironment();
 			executor = null;
 			tableEnv = new BatchTableEnvironmentImpl(
 					execEnv,
@@ -630,18 +677,9 @@ public class ExecutionContext<ClusterID> {
 		database.ifPresent(tableEnv::useDatabase);
 	}
 
-	private ExecutionEnvironment createExecutionEnvironment() {
-		final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
-		execEnv.setRestartStrategy(environment.getExecution().getRestartStrategy());
-		execEnv.setParallelism(environment.getExecution().getParallelism());
-		return execEnv;
-	}
-
 	private StreamExecutionEnvironment createStreamExecutionEnvironment() {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setRestartStrategy(environment.getExecution().getRestartStrategy());
-		env.setParallelism(environment.getExecution().getParallelism());
-		env.setMaxParallelism(environment.getExecution().getMaxParallelism());
+		// for TimeCharacteristic validation in StreamTableEnvironmentImpl
 		env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
 		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
 			env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 7beee2a..d737746 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.table.client.gateway.local;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.python.PythonFunctionFactory;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -49,6 +53,7 @@ import org.apache.commons.cli.Options;
 import org.junit.Test;
 
 import java.net.URL;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -79,21 +84,25 @@ public class ExecutionContextTest {
 	@Test
 	public void testExecutionConfig() throws Exception {
 		final ExecutionContext<?> context = createDefaultExecutionContext();
-		final ExecutionConfig config = context.getExecutionConfig();
+		final TableEnvironment tableEnv = context.getTableEnvironment();
+		final TableConfig tableConfig = tableEnv.getConfig();
 
-		assertEquals(99, config.getAutoWatermarkInterval());
+		assertEquals(1_000, tableConfig.getMinIdleStateRetentionTime());
+		assertEquals(600_000, tableConfig.getMaxIdleStateRetentionTime());
+		Configuration conf = tableConfig.getConfiguration();
 
-		final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
-		assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration);
-		final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
-			(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
-		assertEquals(10, failureRateStrategy.getMaxFailureRate());
-		assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
-		assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
+		assertEquals(1, conf.getInteger(CoreOptions.DEFAULT_PARALLELISM));
+		assertEquals(16, conf.getInteger(PipelineOptions.MAX_PARALLELISM));
 
-		final TableEnvironment tableEnv = context.getTableEnvironment();
-		assertEquals(1_000, tableEnv.getConfig().getMinIdleStateRetentionTime());
-		assertEquals(600_000, tableEnv.getConfig().getMaxIdleStateRetentionTime());
+		assertEquals(TimeCharacteristic.EventTime, conf.get(StreamPipelineOptions.TIME_CHARACTERISTIC));
+		assertEquals(Duration.ofMillis(99), conf.get(PipelineOptions.AUTO_WATERMARK_INTERVAL));
+
+		assertEquals("failure-rate", conf.getString(RestartStrategyOptions.RESTART_STRATEGY));
+		assertEquals(10, conf.getInteger(
+				RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL));
+		assertEquals(Duration.ofMillis(99_000), conf.get(
+				RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL));
+		assertEquals(Duration.ofMillis(1_000), conf.get(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY));
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index d4c7fac..6a7f9ca 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -404,8 +404,8 @@ public class LocalExecutorITCase extends TestLogger {
 			expectedProperties.put("execution.periodic-watermarks-interval", "99");
 			expectedProperties.put("execution.parallelism", "1");
 			expectedProperties.put("execution.max-parallelism", "16");
-			expectedProperties.put("execution.max-idle-state-retention", "0");
-			expectedProperties.put("execution.min-idle-state-retention", "0");
+			expectedProperties.put("execution.max-idle-state-retention", "600000");
+			expectedProperties.put("execution.min-idle-state-retention", "1000");
 			expectedProperties.put("execution.result-mode", "table");
 			expectedProperties.put("execution.max-table-result-rows", "100");
 			expectedProperties.put("execution.restart-strategy.type", "failure-rate");


[flink] 01/03: [hotfix] Code cleanup: remove useless parameter from Environment#enrich method

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02cdfe88d5e686bedb7461e8e0b5f60bf807e095
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 13 17:44:10 2020 +0800

    [hotfix] Code cleanup: remove useless parameter from Environment#enrich method
---
 .../apache/flink/table/client/config/Environment.java  | 18 +++++++-----------
 .../table/client/gateway/local/LocalExecutor.java      |  2 +-
 2 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index cb72b70..7584552 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.FunctionEntry;
 import org.apache.flink.table.client.config.entries.ModuleEntry;
 import org.apache.flink.table.client.config.entries.TableEntry;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
 
@@ -267,28 +266,25 @@ public class Environment {
 	}
 
 	public Environment clone() {
-		return enrich(this, Collections.emptyMap(), Collections.emptyMap());
+		return enrich(this, Collections.emptyMap());
 	}
 
 	/**
-	 * Enriches an environment with new/modified properties or views and returns the new instance.
+	 * Enriches an environment with new/modified properties and returns the new instance.
 	 */
-	public static Environment enrich(
-			Environment env,
-			Map<String, String> properties,
-			Map<String, ViewEntry> views) {
+	public static Environment enrich(Environment env, Map<String, String> properties) {
 		final Environment enrichedEnv = new Environment();
 
+		// copy modules
 		enrichedEnv.modules = new LinkedHashMap<>(env.getModules());
 
-		// merge catalogs
+		// copy catalogs
 		enrichedEnv.catalogs = new LinkedHashMap<>(env.getCatalogs());
 
-		// merge tables
+		// copy tables
 		enrichedEnv.tables = new LinkedHashMap<>(env.getTables());
-		enrichedEnv.tables.putAll(views);
 
-		// merge functions
+		// copy functions
 		enrichedEnv.functions = new HashMap<>(env.getFunctions());
 
 		// enrich execution properties
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index f778522..dbed581 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -286,7 +286,7 @@ public class LocalExecutor implements Executor {
 		Environment env = context.getEnvironment();
 		Environment newEnv;
 		try {
-			newEnv = Environment.enrich(env, Collections.singletonMap(key, value), Collections.emptyMap());
+			newEnv = Environment.enrich(env, Collections.singletonMap(key, value));
 		} catch (Throwable t) {
 			throw new SqlExecutionException("Could not set session property.", t);
 		}


[flink] 02/03: [FLINK-18161][sql-client] Fix state retention config does not work in sql client

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 471a3dfc601f105055c19fd7647110488ccdff70
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 13 21:08:35 2020 +0800

    [FLINK-18161][sql-client] Fix state retention config does not work in sql client
---
 .../client/gateway/local/ExecutionContext.java     | 19 ++++++++++---
 .../client/gateway/local/ExecutionContextTest.java | 32 ++++++++--------------
 .../test/resources/test-sql-client-defaults.yaml   |  4 +--
 3 files changed, 29 insertions(+), 26 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index afe9a06..6198440 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
@@ -52,6 +53,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.SinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -452,10 +454,7 @@ public class ExecutionContext<ClusterID> {
 		final EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings();
 		final boolean noInheritedState = sessionState == null;
 		// Step 0.0 Initialize the table configuration.
-		final TableConfig config = new TableConfig();
-		config.addConfiguration(flinkConfig);
-		environment.getConfiguration().asMap().forEach((k, v) ->
-				config.getConfiguration().setString(k, v));
+		final TableConfig config = createTableConfig();
 
 		if (noInheritedState) {
 			//--------------------------------------------------------------------------------------------------------------
@@ -522,6 +521,18 @@ public class ExecutionContext<ClusterID> {
 		}
 	}
 
+	private TableConfig createTableConfig() {
+		final TableConfig config = new TableConfig();
+		config.addConfiguration(flinkConfig);
+		environment.getConfiguration().asMap().forEach((k, v) ->
+				config.getConfiguration().setString(k, v));
+		ExecutionEntry execution = environment.getExecution();
+		config.setIdleStateRetentionTime(
+				Time.milliseconds(execution.getMinStateRetention()),
+				Time.milliseconds(execution.getMaxStateRetention()));
+		return config;
+	}
+
 	private void createTableEnvironment(
 			EnvironmentSettings settings,
 			TableConfig config,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index a5cc8d1..7beee2a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -90,6 +90,10 @@ public class ExecutionContextTest {
 		assertEquals(10, failureRateStrategy.getMaxFailureRate());
 		assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
 		assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
+
+		final TableEnvironment tableEnv = context.getTableEnvironment();
+		assertEquals(1_000, tableEnv.getConfig().getMinIdleStateRetentionTime());
+		assertEquals(600_000, tableEnv.getConfig().getMaxIdleStateRetentionTime());
 	}
 
 	@Test
@@ -255,35 +259,23 @@ public class ExecutionContextTest {
 		final ExecutionContext<?> context = createConfigurationExecutionContext();
 		final TableEnvironment tableEnv = context.getTableEnvironment();
 
-		assertEquals(
-			100,
-			tableEnv.getConfig().getConfiguration().getInteger(
-				ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
-		assertTrue(
-			tableEnv.getConfig().getConfiguration().getBoolean(
-				ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
-		assertEquals(
-			"128kb",
-			tableEnv.getConfig().getConfiguration().getString(
-				ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
+		Configuration conf = tableEnv.getConfig().getConfiguration();
+		assertEquals(100, conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
+		assertTrue(conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
+		assertEquals("128kb", conf.getString(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
 
-		assertTrue(
-			tableEnv.getConfig().getConfiguration().getBoolean(
-				OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
+		assertTrue(conf.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
 
 		// these options are not modified and should be equal to their default value
 		assertEquals(
 			ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
-			tableEnv.getConfig().getConfiguration().getBoolean(
-				ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+			conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
 		assertEquals(
 			ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
-			tableEnv.getConfig().getConfiguration().getString(
-				ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
+			conf.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
 		assertEquals(
 			OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
-			tableEnv.getConfig().getConfiguration().getLong(
-				OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
+			conf.getLong(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index df53d19..a730f4c 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -141,8 +141,8 @@ execution:
   periodic-watermarks-interval: 99
   parallelism: 1
   max-parallelism: 16
-  min-idle-state-retention: 0
-  max-idle-state-retention: 0
+  min-idle-state-retention: 1000
+  max-idle-state-retention: 600000
   result-mode: "$VAR_RESULT_MODE"
   max-table-result-rows: "$VAR_MAX_ROWS"
   restart-strategy: