You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:45 UTC

[flink] branch master updated (6a76233bef3 -> 12efbb9c85c)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 6a76233bef3 [hotfix][docs] Set proper watermark and description for event-time temporal table join example
     new 48ae78b1805 [hotfix][streaming] Fix AUTO_WATERMARK_INTERVAL default value
     new c319340e972 [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map
     new bf0d28aae03 [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration
     new 12efbb9c85c [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/pipeline_configuration.html          |   2 +-
 .../apache/flink/api/common/ExecutionConfig.java   | 332 +++++++++------------
 .../flink/configuration/PipelineOptions.java       |   2 +-
 .../pyflink/common/tests/test_execution_config.py  |   6 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  23 +-
 .../partitioner/StreamPartitionerTestUtils.java    |   9 +-
 .../plan/batch/sql/ForwardHashExchangeTest.java    |   3 +-
 .../runtime/batch/ParallelismSettingTest.java      |   3 +-
 .../batch/sql/ForwardHashExchangeITCase.java       |   3 +-
 9 files changed, 174 insertions(+), 209 deletions(-)


[flink] 03/04: [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf0d28aae03542e5f7d1273d34c512349b85c841
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 09:31:33 2022 +0100

    [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration
---
 .../apache/flink/api/common/ExecutionConfig.java   | 32 ++++++++++------------
 .../pyflink/common/tests/test_execution_config.py  |  4 +--
 .../org/apache/flink/runtime/taskmanager/Task.java | 23 +++++++++-------
 3 files changed, 29 insertions(+), 30 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 461cdb4e107..4d618173e58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -152,14 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
     private boolean isDynamicGraph = false;
 
-    private long taskCancellationIntervalMillis = -1;
-
-    /**
-     * Timeout after which an ongoing task cancellation will lead to a fatal TaskManager error,
-     * usually killing the JVM.
-     */
-    private long taskCancellationTimeoutMillis = -1;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -383,7 +375,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
      */
     public long getTaskCancellationInterval() {
-        return this.taskCancellationIntervalMillis;
+        return configuration.get(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
     }
 
     /**
@@ -393,7 +385,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @param interval the interval (in milliseconds).
      */
     public ExecutionConfig setTaskCancellationInterval(long interval) {
-        this.taskCancellationIntervalMillis = interval;
+        configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
         return this;
     }
 
@@ -406,7 +398,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getTaskCancellationTimeout() {
-        return this.taskCancellationTimeoutMillis;
+        return configuration.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
     }
 
     /**
@@ -424,7 +416,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public ExecutionConfig setTaskCancellationTimeout(long timeout) {
         checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
-        this.taskCancellationTimeoutMillis = timeout;
+        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
         return this;
     }
 
@@ -962,7 +954,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
                     && registeredPojoTypes.equals(other.registeredPojoTypes)
-                    && taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
                     && isDynamicGraph == other.isDynamicGraph;
 
         } else {
@@ -980,7 +971,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
                 registeredPojoTypes,
-                taskCancellationIntervalMillis,
                 isDynamicGraph);
     }
 
@@ -993,10 +983,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + executionRetryDelay
                 + ", restartStrategyConfiguration="
                 + restartStrategyConfiguration
-                + ", taskCancellationIntervalMillis="
-                + taskCancellationIntervalMillis
-                + ", taskCancellationTimeoutMillis="
-                + taskCancellationTimeoutMillis
                 + ", globalJobParameters="
                 + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
@@ -1178,6 +1164,16 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                                                 == JobManagerOptions.SchedulerType.AdaptiveBatch));
     }
 
+    /**
+     * @return A copy of internal {@link #configuration}. Note it is missing all options that are
+     *     stored as plain java fields in {@link ExecutionConfig}, for example {@link
+     *     #registeredKryoTypes} or {@link #globalJobParameters}.
+     */
+    @Internal
+    public Configuration toConfiguration() {
+        return new Configuration(configuration);
+    }
+
     private LinkedHashSet<Class<?>> loadClasses(
             List<String> classNames, ClassLoader classLoader, String errorMessage) {
         return classNames.stream()
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 5c8917f0210..851413f26ff 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -77,7 +77,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_get_set_task_cancellation_interval(self):
 
-        self.assertEqual(self.execution_config.get_task_cancellation_interval(), -1)
+        self.assertEqual(self.execution_config.get_task_cancellation_interval(), 30000)
 
         self.execution_config.set_task_cancellation_interval(1000)
 
@@ -85,7 +85,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_get_set_task_cancellation_timeout(self):
 
-        self.assertEqual(self.execution_config.get_task_cancellation_timeout(), -1)
+        self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 180000)
 
         self.execution_config.set_task_cancellation_timeout(3000)
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 360290c31ed..2f65781984f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -612,16 +612,19 @@ public class Task
             userCodeClassLoader = createUserCodeClassloader();
             final ExecutionConfig executionConfig =
                     serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());
-
-            if (executionConfig.getTaskCancellationInterval() >= 0) {
-                // override task cancellation interval from Flink config if set in ExecutionConfig
-                taskCancellationInterval = executionConfig.getTaskCancellationInterval();
-            }
-
-            if (executionConfig.getTaskCancellationTimeout() >= 0) {
-                // override task cancellation timeout from Flink config if set in ExecutionConfig
-                taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
-            }
+            Configuration executionConfigConfiguration = executionConfig.toConfiguration();
+
+            // override task cancellation interval from Flink config if set in ExecutionConfig
+            taskCancellationInterval =
+                    executionConfigConfiguration
+                            .getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
+                            .orElse(taskCancellationInterval);
+
+            // override task cancellation timeout from Flink config if set in ExecutionConfig
+            taskCancellationTimeout =
+                    executionConfigConfiguration
+                            .getOptional(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)
+                            .orElse(taskCancellationTimeout);
 
             if (isCanceledOrFailed()) {
                 throw new CancelTaskException();


[flink] 04/04: [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12efbb9c85c605db55a2aea2a7bbea62fa930e1d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 10:43:14 2022 +0100

    [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration
---
 .../apache/flink/api/common/ExecutionConfig.java   | 32 ++++++++++------------
 .../partitioner/StreamPartitionerTestUtils.java    |  9 ++++--
 .../plan/batch/sql/ForwardHashExchangeTest.java    |  3 +-
 .../runtime/batch/ParallelismSettingTest.java      |  3 +-
 .../batch/sql/ForwardHashExchangeITCase.java       |  3 +-
 5 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 4d618173e58..1e75154edc3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
@@ -150,8 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
             new RestartStrategies.FallbackRestartStrategyConfiguration();
 
-    private boolean isDynamicGraph = false;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -464,14 +464,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
         }
     }
 
+    /**
+     * TODO: this shouldn't exist and shouldn't pollute public API. Tests should change this via
+     * Configuration
+     */
+    @VisibleForTesting
     @Internal
-    public void setDynamicGraph(boolean dynamicGraph) {
-        isDynamicGraph = dynamicGraph;
+    public ExecutionConfig setScheduler(SchedulerType schedulerType) {
+        configuration.set(JobManagerOptions.SCHEDULER, schedulerType);
+        return this;
     }
 
     @Internal
     public boolean isDynamicGraph() {
-        return isDynamicGraph;
+        return configuration.get(JobManagerOptions.SCHEDULER) == SchedulerType.AdaptiveBatch;
     }
 
     /**
@@ -953,8 +959,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                             other.registeredTypesWithKryoSerializerClasses)
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
-                    && registeredPojoTypes.equals(other.registeredPojoTypes)
-                    && isDynamicGraph == other.isDynamicGraph;
+                    && registeredPojoTypes.equals(other.registeredPojoTypes);
 
         } else {
             return false;
@@ -970,8 +975,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 registeredTypesWithKryoSerializerClasses,
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
-                registeredPojoTypes,
-                isDynamicGraph);
+                registeredPojoTypes);
     }
 
     @Override
@@ -997,8 +1001,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + registeredKryoTypes
                 + ", registeredPojoTypes="
                 + registeredPojoTypes
-                + ", isDynamicGraph="
-                + isDynamicGraph
                 + '}';
     }
 
@@ -1155,13 +1157,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
                 .ifPresent(c -> this.registeredKryoTypes = c);
 
-        configuration
-                .getOptional(JobManagerOptions.SCHEDULER)
-                .ifPresent(
-                        schedulerType ->
-                                this.setDynamicGraph(
-                                        schedulerType
-                                                == JobManagerOptions.SchedulerType.AdaptiveBatch));
+        configuration.getOptional(JobManagerOptions.SCHEDULER).ifPresent(this::setScheduler);
     }
 
     /**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
index 12e9d54aa6e..5ac50c654a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,9 +48,11 @@ public class StreamPartitionerTestUtils {
             StreamPartitioner<Long> streamPartitioner,
             StreamExchangeMode exchangeMode) {
 
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, SchedulerType.AdaptiveBatch);
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.getConfig().setDynamicGraph(true);
 
         final DataStream<Long> source =
                 env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index b5edb3632f9..c9d3e42edae 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.planner.plan.batch.sql;
 
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -35,7 +36,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void before() {
         util = batchTestUtil(TableConfig.getDefault());
 
-        util.getStreamEnv().getConfig().setDynamicGraph(true);
+        util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         util.tableEnv()
                 .getConfig()
                 .getConfiguration()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
index fa3f1c248df..7fed38f5831 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.runtime.batch;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -41,7 +42,7 @@ public class ParallelismSettingTest extends TableTestBase {
     public void before() {
         util = batchTestUtil(TableConfig.getDefault());
 
-        util.getStreamEnv().getConfig().setDynamicGraph(true);
+        util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE MyTable (\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index 522eddc2eaa..f98ffce0af5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql;
 
 import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
@@ -37,7 +38,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
     @Before
     public void before() throws Exception {
         super.before();
-        env().getConfig().setDynamicGraph(true);
+        env().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         env().disableOperatorChaining();
         tEnv().getConfig()
                 .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);


[flink] 01/04: [hotfix][streaming] Fix AUTO_WATERMARK_INTERVAL default value

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48ae78b1805c92beb346bb3f2a59371ff9dd522a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Oct 14 09:06:35 2022 +0200

    [hotfix][streaming] Fix AUTO_WATERMARK_INTERVAL default value
    
    The actual default value was defined in the ExecutionConfig as 200ms anyway.
    This fix changes only documentation to reflect tha actual status.
---
 docs/layouts/shortcodes/generated/pipeline_configuration.html          | 2 +-
 .../src/main/java/org/apache/flink/api/common/ExecutionConfig.java     | 3 ++-
 .../src/main/java/org/apache/flink/configuration/PipelineOptions.java  | 2 +-
 3 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html b/docs/layouts/shortcodes/generated/pipeline_configuration.html
index c2bcf9f45d9..be15e843090 100644
--- a/docs/layouts/shortcodes/generated/pipeline_configuration.html
+++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html
@@ -22,7 +22,7 @@
         </tr>
         <tr>
             <td><h5>pipeline.auto-watermark-interval</h5></td>
-            <td style="word-wrap: break-word;">0 ms</td>
+            <td style="word-wrap: break-word;">200 ms</td>
             <td>Duration</td>
             <td>The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.</td>
         </tr>
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 24e01033de5..1c28d2b7091 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -137,7 +137,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     private boolean autoTypeRegistrationEnabled = true;
 
     private boolean forceAvro = false;
-    private long autoWatermarkInterval = 200;
+    private long autoWatermarkInterval =
+            PipelineOptions.AUTO_WATERMARK_INTERVAL.defaultValue().toMillis();
 
     // ---------- statebackend related configurations ------------------------------
     /**
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 81c4769ba10..842fe7f2310 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -95,7 +95,7 @@ public class PipelineOptions {
     public static final ConfigOption<Duration> AUTO_WATERMARK_INTERVAL =
             key("pipeline.auto-watermark-interval")
                     .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .defaultValue(Duration.ofMillis(200))
                     .withDescription(
                             "The interval of the automatic watermark emission. Watermarks are used throughout"
                                     + " the streaming system to keep track of the progress of time. They are used, for example,"


[flink] 02/04: [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c319340e9728bd60565c15922add220dc233d921
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Oct 13 16:30:20 2022 +0200

    [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map
---
 .../apache/flink/api/common/ExecutionConfig.java   | 269 +++++++++------------
 .../pyflink/common/tests/test_execution_config.py  |   2 +-
 2 files changed, 116 insertions(+), 155 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 1c28d2b7091..461cdb4e107 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DescribedEnum;
@@ -47,6 +49,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -104,57 +107,40 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
     private static final long DEFAULT_RESTART_DELAY = 10000L;
 
-    // --------------------------------------------------------------------------------------------
-
-    /** Defines how data exchange happens - batch or pipelined */
-    private ExecutionMode executionMode = ExecutionMode.PIPELINED;
-
-    private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
-
-    private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
-
     /**
-     * The program wide maximum parallelism used for operators which haven't specified a maximum
-     * parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
-     * number of key groups used for partitioned state.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
+     *
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    private int maxParallelism = -1;
+    // --------------------------------------------------------------------------------------------
+
+    private static final ConfigOption<ExecutionMode> EXECUTION_MODE =
+            key("hidden.execution.mode")
+                    .enumType(ExecutionMode.class)
+                    .defaultValue(ExecutionMode.PIPELINED)
+                    .withDescription("Defines how data exchange happens - batch or pipelined");
 
     /**
-     * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
+     * Use {@link
+     * org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}
      */
-    @Deprecated private int numberOfExecutionRetries = -1;
-
-    private boolean forceKryo = false;
-
-    /** Flag to indicate whether generic types (through Kryo) are supported */
-    private boolean disableGenericTypes = false;
-
-    private boolean enableAutoGeneratedUids = true;
-
-    private boolean objectReuse = false;
-
-    private boolean autoTypeRegistrationEnabled = true;
-
-    private boolean forceAvro = false;
-    private long autoWatermarkInterval =
-            PipelineOptions.AUTO_WATERMARK_INTERVAL.defaultValue().toMillis();
+    @Deprecated
+    private static final ConfigOption<Integer> EXECUTION_RETRIES =
+            key("hidden.execution.retries")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "Should no longer be used because it is subsumed by RestartStrategyConfiguration");
+    // --------------------------------------------------------------------------------------------
 
-    // ---------- statebackend related configurations ------------------------------
     /**
-     * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
+     * In the long run, this field should be somehow merged with the {@link Configuration} from
+     * StreamExecutionEnvironment.
      */
-    private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
-
-    private boolean isLatencyTrackingConfigured = false;
-
-    /** Interval in milliseconds to perform periodic changelog materialization. */
-    private long periodicMaterializeIntervalMillis =
-            StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue().toMillis();
-
-    /** Max allowed number of consecutive failures for changelog materialization */
-    private int materializationMaxAllowedFailures =
-            StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED.defaultValue();
+    private final Configuration configuration = new Configuration();
 
     /**
      * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -174,11 +160,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     private long taskCancellationTimeoutMillis = -1;
 
-    /**
-     * This flag defines if we use compression for the state snapshot data or not. Default: false
-     */
-    private boolean useSnapshotCompression = false;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -212,8 +193,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * be serializable because it needs to be sent to worker nodes.
      */
     public ExecutionConfig enableClosureCleaner() {
-        this.closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
-        return this;
+        return setClosureCleanerLevel(ClosureCleanerLevel.RECURSIVE);
     }
 
     /**
@@ -222,8 +202,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableClosureCleaner()
      */
     public ExecutionConfig disableClosureCleaner() {
-        this.closureCleanerLevel = ClosureCleanerLevel.NONE;
-        return this;
+        return setClosureCleanerLevel(ClosureCleanerLevel.NONE);
     }
 
     /**
@@ -232,7 +211,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableClosureCleaner()
      */
     public boolean isClosureCleanerEnabled() {
-        return !(closureCleanerLevel == ClosureCleanerLevel.NONE);
+        return !(getClosureCleanerLevel() == ClosureCleanerLevel.NONE);
     }
 
     /**
@@ -240,13 +219,13 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * different settings.
      */
     public ExecutionConfig setClosureCleanerLevel(ClosureCleanerLevel level) {
-        this.closureCleanerLevel = level;
+        configuration.set(PipelineOptions.CLOSURE_CLEANER_LEVEL, level);
         return this;
     }
 
     /** Returns the configured {@link ClosureCleanerLevel}. */
     public ClosureCleanerLevel getClosureCleanerLevel() {
-        return closureCleanerLevel;
+        return configuration.get(PipelineOptions.CLOSURE_CLEANER_LEVEL);
     }
 
     /**
@@ -261,7 +240,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public ExecutionConfig setAutoWatermarkInterval(long interval) {
         Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
-        this.autoWatermarkInterval = interval;
+        return setAutoWatermarkInterval(Duration.ofMillis(interval));
+    }
+
+    private ExecutionConfig setAutoWatermarkInterval(Duration autoWatermarkInterval) {
+        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, autoWatermarkInterval);
         return this;
     }
 
@@ -272,7 +255,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getAutoWatermarkInterval() {
-        return this.autoWatermarkInterval;
+        return configuration.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
     }
 
     /**
@@ -285,8 +268,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public ExecutionConfig setLatencyTrackingInterval(long interval) {
-        this.latencyTrackingInterval = interval;
-        this.isLatencyTrackingConfigured = true;
+        configuration.set(MetricOptions.LATENCY_INTERVAL, interval);
         return this;
     }
 
@@ -297,32 +279,38 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getLatencyTrackingInterval() {
-        return latencyTrackingInterval;
+        return configuration.get(MetricOptions.LATENCY_INTERVAL);
     }
 
     @Internal
     public boolean isLatencyTrackingConfigured() {
-        return isLatencyTrackingConfigured;
+        return configuration.getOptional(MetricOptions.LATENCY_INTERVAL).isPresent();
     }
 
     @Internal
     public long getPeriodicMaterializeIntervalMillis() {
-        return periodicMaterializeIntervalMillis;
+        return configuration
+                .get(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)
+                .toMillis();
     }
 
     @Internal
     public void setPeriodicMaterializeIntervalMillis(Duration periodicMaterializeInterval) {
-        this.periodicMaterializeIntervalMillis = periodicMaterializeInterval.toMillis();
+        configuration.set(
+                StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                periodicMaterializeInterval);
     }
 
     @Internal
     public int getMaterializationMaxAllowedFailures() {
-        return materializationMaxAllowedFailures;
+        return configuration.get(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED);
     }
 
     @Internal
     public void setMaterializationMaxAllowedFailures(int materializationMaxAllowedFailures) {
-        this.materializationMaxAllowedFailures = materializationMaxAllowedFailures;
+        configuration.set(
+                StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+                materializationMaxAllowedFailures);
     }
 
     /**
@@ -338,7 +326,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      *     used.
      */
     public int getParallelism() {
-        return parallelism;
+        return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
     }
 
     /**
@@ -359,7 +347,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 throw new IllegalArgumentException(
                         "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
             }
-            this.parallelism = parallelism;
+            configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
         }
         return this;
     }
@@ -374,7 +362,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public int getMaxParallelism() {
-        return maxParallelism;
+        return configuration.get(PipelineOptions.MAX_PARALLELISM);
     }
 
     /**
@@ -388,7 +376,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public void setMaxParallelism(int maxParallelism) {
         checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
-        this.maxParallelism = maxParallelism;
+        configuration.set(PipelineOptions.MAX_PARALLELISM, maxParallelism);
     }
 
     /**
@@ -503,7 +491,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @Deprecated
     public int getNumberOfExecutionRetries() {
-        return numberOfExecutionRetries;
+        return configuration.get(EXECUTION_RETRIES);
     }
 
     /**
@@ -535,7 +523,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
             throw new IllegalArgumentException(
                     "The number of execution retries must be non-negative, or -1 (use system default)");
         }
-        this.numberOfExecutionRetries = numberOfExecutionRetries;
+        configuration.set(EXECUTION_RETRIES, numberOfExecutionRetries);
         return this;
     }
 
@@ -566,7 +554,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @param executionMode The execution mode to use.
      */
     public void setExecutionMode(ExecutionMode executionMode) {
-        this.executionMode = executionMode;
+        configuration.set(EXECUTION_MODE, executionMode);
     }
 
     /**
@@ -578,7 +566,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @return The execution mode for the program.
      */
     public ExecutionMode getExecutionMode() {
-        return executionMode;
+        return configuration.get(EXECUTION_MODE);
     }
 
     /**
@@ -613,16 +601,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * cannot be analyzed as POJO.
      */
     public void enableForceKryo() {
-        forceKryo = true;
+        setForceKryo(true);
     }
 
     /** Disable use of Kryo serializer for all POJOs. */
     public void disableForceKryo() {
-        forceKryo = false;
+        setForceKryo(false);
+    }
+
+    private void setForceKryo(boolean forceKryo) {
+        configuration.set(PipelineOptions.FORCE_KRYO, forceKryo);
     }
 
     public boolean isForceKryoEnabled() {
-        return forceKryo;
+        return configuration.get(PipelineOptions.FORCE_KRYO);
     }
 
     /**
@@ -633,7 +625,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableGenericTypes()
      */
     public void enableGenericTypes() {
-        disableGenericTypes = false;
+        setGenericTypes(true);
     }
 
     /**
@@ -653,7 +645,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableGenericTypes()
      */
     public void disableGenericTypes() {
-        disableGenericTypes = true;
+        setGenericTypes(false);
+    }
+
+    private void setGenericTypes(boolean genericTypes) {
+        configuration.set(PipelineOptions.GENERIC_TYPES, genericTypes);
     }
 
     /**
@@ -666,7 +662,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableGenericTypes()
      */
     public boolean hasGenericTypesDisabled() {
-        return disableGenericTypes;
+        return !configuration.get(PipelineOptions.GENERIC_TYPES);
     }
 
     /**
@@ -675,7 +671,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableAutoGeneratedUIDs()
      */
     public void enableAutoGeneratedUIDs() {
-        enableAutoGeneratedUids = true;
+        setAutoGeneratedUids(true);
     }
 
     /**
@@ -688,7 +684,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * overtime without discarding state.
      */
     public void disableAutoGeneratedUIDs() {
-        enableAutoGeneratedUids = false;
+        setAutoGeneratedUids(false);
+    }
+
+    private void setAutoGeneratedUids(boolean autoGeneratedUids) {
+        configuration.set(PipelineOptions.AUTO_GENERATE_UIDS, autoGeneratedUids);
     }
 
     /**
@@ -700,7 +700,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableAutoGeneratedUIDs()
      */
     public boolean hasAutoGeneratedUIDsEnabled() {
-        return enableAutoGeneratedUids;
+        return configuration.get(PipelineOptions.AUTO_GENERATE_UIDS);
     }
 
     /**
@@ -709,17 +709,21 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * <p><b>Important:</b> Make sure to include the <i>flink-avro</i> module.
      */
     public void enableForceAvro() {
-        forceAvro = true;
+        setForceAvro(true);
     }
 
     /** Disables the Apache Avro serializer as the forced serializer for POJOs. */
     public void disableForceAvro() {
-        forceAvro = false;
+        setForceAvro(false);
+    }
+
+    private void setForceAvro(boolean forceAvro) {
+        configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
     }
 
     /** Returns whether the Apache Avro is the default serializer for POJOs. */
     public boolean isForceAvroEnabled() {
-        return forceAvro;
+        return configuration.get(PipelineOptions.FORCE_AVRO);
     }
 
     /**
@@ -728,8 +732,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * an operation is not aware of this behaviour.
      */
     public ExecutionConfig enableObjectReuse() {
-        objectReuse = true;
-        return this;
+        return setObjectReuse(true);
     }
 
     /**
@@ -737,13 +740,17 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * user-code functions. @see #enableObjectReuse()
      */
     public ExecutionConfig disableObjectReuse() {
-        objectReuse = false;
+        return setObjectReuse(false);
+    }
+
+    private ExecutionConfig setObjectReuse(boolean objectReuse) {
+        configuration.set(PipelineOptions.OBJECT_REUSE, objectReuse);
         return this;
     }
 
     /** Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse() */
     public boolean isObjectReuseEnabled() {
-        return objectReuse;
+        return configuration.get(PipelineOptions.OBJECT_REUSE);
     }
 
     public GlobalJobParameters getGlobalJobParameters() {
@@ -915,22 +922,26 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     }
 
     public boolean isAutoTypeRegistrationDisabled() {
-        return !autoTypeRegistrationEnabled;
+        return !configuration.get(PipelineOptions.AUTO_TYPE_REGISTRATION);
     }
 
     /**
      * Control whether Flink is automatically registering all types in the user programs with Kryo.
      */
     public void disableAutoTypeRegistration() {
-        this.autoTypeRegistrationEnabled = false;
+        setAutoTypeRegistration(false);
+    }
+
+    private void setAutoTypeRegistration(Boolean autoTypeRegistration) {
+        configuration.set(PipelineOptions.AUTO_TYPE_REGISTRATION, autoTypeRegistration);
     }
 
     public boolean isUseSnapshotCompression() {
-        return useSnapshotCompression;
+        return configuration.get(ExecutionOptions.SNAPSHOT_COMPRESSION);
     }
 
     public void setUseSnapshotCompression(boolean useSnapshotCompression) {
-        this.useSnapshotCompression = useSnapshotCompression;
+        configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
     @Override
@@ -939,28 +950,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
             ExecutionConfig other = (ExecutionConfig) obj;
 
             return other.canEqual(this)
-                    && Objects.equals(executionMode, other.executionMode)
-                    && closureCleanerLevel == other.closureCleanerLevel
-                    && parallelism == other.parallelism
+                    && Objects.equals(configuration, other.configuration)
                     && ((restartStrategyConfiguration == null
                                     && other.restartStrategyConfiguration == null)
                             || (null != restartStrategyConfiguration
                                     && restartStrategyConfiguration.equals(
                                             other.restartStrategyConfiguration)))
-                    && forceKryo == other.forceKryo
-                    && disableGenericTypes == other.disableGenericTypes
-                    && objectReuse == other.objectReuse
-                    && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled
-                    && forceAvro == other.forceAvro
                     && Objects.equals(globalJobParameters, other.globalJobParameters)
-                    && autoWatermarkInterval == other.autoWatermarkInterval
                     && registeredTypesWithKryoSerializerClasses.equals(
                             other.registeredTypesWithKryoSerializerClasses)
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
                     && registeredPojoTypes.equals(other.registeredPojoTypes)
                     && taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
-                    && useSnapshotCompression == other.useSnapshotCompression
                     && isDynamicGraph == other.isDynamicGraph;
 
         } else {
@@ -971,57 +973,22 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @Override
     public int hashCode() {
         return Objects.hash(
-                executionMode,
-                closureCleanerLevel,
-                parallelism,
+                configuration,
                 restartStrategyConfiguration,
-                forceKryo,
-                disableGenericTypes,
-                objectReuse,
-                autoTypeRegistrationEnabled,
-                forceAvro,
                 globalJobParameters,
-                autoWatermarkInterval,
                 registeredTypesWithKryoSerializerClasses,
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
                 registeredPojoTypes,
                 taskCancellationIntervalMillis,
-                useSnapshotCompression,
                 isDynamicGraph);
     }
 
     @Override
     public String toString() {
         return "ExecutionConfig{"
-                + "executionMode="
-                + executionMode
-                + ", closureCleanerLevel="
-                + closureCleanerLevel
-                + ", parallelism="
-                + parallelism
-                + ", maxParallelism="
-                + maxParallelism
-                + ", numberOfExecutionRetries="
-                + numberOfExecutionRetries
-                + ", forceKryo="
-                + forceKryo
-                + ", disableGenericTypes="
-                + disableGenericTypes
-                + ", enableAutoGeneratedUids="
-                + enableAutoGeneratedUids
-                + ", objectReuse="
-                + objectReuse
-                + ", autoTypeRegistrationEnabled="
-                + autoTypeRegistrationEnabled
-                + ", forceAvro="
-                + forceAvro
-                + ", autoWatermarkInterval="
-                + autoWatermarkInterval
-                + ", latencyTrackingInterval="
-                + latencyTrackingInterval
-                + ", isLatencyTrackingConfigured="
-                + isLatencyTrackingConfigured
+                + "configuration="
+                + configuration
                 + ", executionRetryDelay="
                 + executionRetryDelay
                 + ", restartStrategyConfiguration="
@@ -1030,8 +997,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + taskCancellationIntervalMillis
                 + ", taskCancellationTimeoutMillis="
                 + taskCancellationTimeoutMillis
-                + ", useSnapshotCompression="
-                + useSnapshotCompression
                 + ", globalJobParameters="
                 + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
@@ -1145,21 +1110,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     public void configure(ReadableConfig configuration, ClassLoader classLoader) {
         configuration
                 .getOptional(PipelineOptions.AUTO_TYPE_REGISTRATION)
-                .ifPresent(b -> this.autoTypeRegistrationEnabled = b);
+                .ifPresent(this::setAutoTypeRegistration);
         configuration
                 .getOptional(PipelineOptions.AUTO_GENERATE_UIDS)
-                .ifPresent(b -> this.enableAutoGeneratedUids = b);
+                .ifPresent(this::setAutoGeneratedUids);
         configuration
                 .getOptional(PipelineOptions.AUTO_WATERMARK_INTERVAL)
-                .ifPresent(i -> this.setAutoWatermarkInterval(i.toMillis()));
+                .ifPresent(this::setAutoWatermarkInterval);
         configuration
                 .getOptional(PipelineOptions.CLOSURE_CLEANER_LEVEL)
                 .ifPresent(this::setClosureCleanerLevel);
-        configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(b -> this.forceAvro = b);
-        configuration
-                .getOptional(PipelineOptions.GENERIC_TYPES)
-                .ifPresent(b -> this.disableGenericTypes = !b);
-        configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(b -> this.forceKryo = b);
+        configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(this::setForceAvro);
+        configuration.getOptional(PipelineOptions.GENERIC_TYPES).ifPresent(this::setGenericTypes);
+        configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo);
         configuration
                 .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
                 .<GlobalJobParameters>map(MapBasedJobParameters::new)
@@ -1180,9 +1143,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 .getOptional(PipelineOptions.MAX_PARALLELISM)
                 .ifPresent(this::setMaxParallelism);
         configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
-        configuration
-                .getOptional(PipelineOptions.OBJECT_REUSE)
-                .ifPresent(o -> this.objectReuse = o);
+        configuration.getOptional(PipelineOptions.OBJECT_REUSE).ifPresent(this::setObjectReuse);
         configuration
                 .getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
                 .ifPresent(this::setTaskCancellationInterval);
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index c65c0377b41..5c8917f0210 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -267,7 +267,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         self.assertNotEqual(config1, config2)
 
-        self.assertNotEqual(hash(config1), hash(config2))
+        # it is allowed for hashes to be equal even if objects are not
 
         config2.set_parallelism(12)