You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:47 UTC

[flink] 02/04: [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c319340e9728bd60565c15922add220dc233d921
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Oct 13 16:30:20 2022 +0200

    [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map
---
 .../apache/flink/api/common/ExecutionConfig.java   | 269 +++++++++------------
 .../pyflink/common/tests/test_execution_config.py  |   2 +-
 2 files changed, 116 insertions(+), 155 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 1c28d2b7091..461cdb4e107 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DescribedEnum;
@@ -47,6 +49,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -104,57 +107,40 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
     private static final long DEFAULT_RESTART_DELAY = 10000L;
 
-    // --------------------------------------------------------------------------------------------
-
-    /** Defines how data exchange happens - batch or pipelined */
-    private ExecutionMode executionMode = ExecutionMode.PIPELINED;
-
-    private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
-
-    private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
-
     /**
-     * The program wide maximum parallelism used for operators which haven't specified a maximum
-     * parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
-     * number of key groups used for partitioned state.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
+     *
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    private int maxParallelism = -1;
+    // --------------------------------------------------------------------------------------------
+
+    private static final ConfigOption<ExecutionMode> EXECUTION_MODE =
+            key("hidden.execution.mode")
+                    .enumType(ExecutionMode.class)
+                    .defaultValue(ExecutionMode.PIPELINED)
+                    .withDescription("Defines how data exchange happens - batch or pipelined");
 
     /**
-     * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
+     * Use {@link
+     * org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}
      */
-    @Deprecated private int numberOfExecutionRetries = -1;
-
-    private boolean forceKryo = false;
-
-    /** Flag to indicate whether generic types (through Kryo) are supported */
-    private boolean disableGenericTypes = false;
-
-    private boolean enableAutoGeneratedUids = true;
-
-    private boolean objectReuse = false;
-
-    private boolean autoTypeRegistrationEnabled = true;
-
-    private boolean forceAvro = false;
-    private long autoWatermarkInterval =
-            PipelineOptions.AUTO_WATERMARK_INTERVAL.defaultValue().toMillis();
+    @Deprecated
+    private static final ConfigOption<Integer> EXECUTION_RETRIES =
+            key("hidden.execution.retries")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "Should no longer be used because it is subsumed by RestartStrategyConfiguration");
+    // --------------------------------------------------------------------------------------------
 
-    // ---------- statebackend related configurations ------------------------------
     /**
-     * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
+     * In the long run, this field should be somehow merged with the {@link Configuration} from
+     * StreamExecutionEnvironment.
      */
-    private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
-
-    private boolean isLatencyTrackingConfigured = false;
-
-    /** Interval in milliseconds to perform periodic changelog materialization. */
-    private long periodicMaterializeIntervalMillis =
-            StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue().toMillis();
-
-    /** Max allowed number of consecutive failures for changelog materialization */
-    private int materializationMaxAllowedFailures =
-            StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED.defaultValue();
+    private final Configuration configuration = new Configuration();
 
     /**
      * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -174,11 +160,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     private long taskCancellationTimeoutMillis = -1;
 
-    /**
-     * This flag defines if we use compression for the state snapshot data or not. Default: false
-     */
-    private boolean useSnapshotCompression = false;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -212,8 +193,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * be serializable because it needs to be sent to worker nodes.
      */
     public ExecutionConfig enableClosureCleaner() {
-        this.closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
-        return this;
+        return setClosureCleanerLevel(ClosureCleanerLevel.RECURSIVE);
     }
 
     /**
@@ -222,8 +202,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableClosureCleaner()
      */
     public ExecutionConfig disableClosureCleaner() {
-        this.closureCleanerLevel = ClosureCleanerLevel.NONE;
-        return this;
+        return setClosureCleanerLevel(ClosureCleanerLevel.NONE);
     }
 
     /**
@@ -232,7 +211,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableClosureCleaner()
      */
     public boolean isClosureCleanerEnabled() {
-        return !(closureCleanerLevel == ClosureCleanerLevel.NONE);
+        return !(getClosureCleanerLevel() == ClosureCleanerLevel.NONE);
     }
 
     /**
@@ -240,13 +219,13 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * different settings.
      */
     public ExecutionConfig setClosureCleanerLevel(ClosureCleanerLevel level) {
-        this.closureCleanerLevel = level;
+        configuration.set(PipelineOptions.CLOSURE_CLEANER_LEVEL, level);
         return this;
     }
 
     /** Returns the configured {@link ClosureCleanerLevel}. */
     public ClosureCleanerLevel getClosureCleanerLevel() {
-        return closureCleanerLevel;
+        return configuration.get(PipelineOptions.CLOSURE_CLEANER_LEVEL);
     }
 
     /**
@@ -261,7 +240,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public ExecutionConfig setAutoWatermarkInterval(long interval) {
         Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
-        this.autoWatermarkInterval = interval;
+        return setAutoWatermarkInterval(Duration.ofMillis(interval));
+    }
+
+    private ExecutionConfig setAutoWatermarkInterval(Duration autoWatermarkInterval) {
+        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, autoWatermarkInterval);
         return this;
     }
 
@@ -272,7 +255,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getAutoWatermarkInterval() {
-        return this.autoWatermarkInterval;
+        return configuration.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
     }
 
     /**
@@ -285,8 +268,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public ExecutionConfig setLatencyTrackingInterval(long interval) {
-        this.latencyTrackingInterval = interval;
-        this.isLatencyTrackingConfigured = true;
+        configuration.set(MetricOptions.LATENCY_INTERVAL, interval);
         return this;
     }
 
@@ -297,32 +279,38 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getLatencyTrackingInterval() {
-        return latencyTrackingInterval;
+        return configuration.get(MetricOptions.LATENCY_INTERVAL);
     }
 
     @Internal
     public boolean isLatencyTrackingConfigured() {
-        return isLatencyTrackingConfigured;
+        return configuration.getOptional(MetricOptions.LATENCY_INTERVAL).isPresent();
     }
 
     @Internal
     public long getPeriodicMaterializeIntervalMillis() {
-        return periodicMaterializeIntervalMillis;
+        return configuration
+                .get(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)
+                .toMillis();
     }
 
     @Internal
     public void setPeriodicMaterializeIntervalMillis(Duration periodicMaterializeInterval) {
-        this.periodicMaterializeIntervalMillis = periodicMaterializeInterval.toMillis();
+        configuration.set(
+                StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                periodicMaterializeInterval);
     }
 
     @Internal
     public int getMaterializationMaxAllowedFailures() {
-        return materializationMaxAllowedFailures;
+        return configuration.get(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED);
     }
 
     @Internal
     public void setMaterializationMaxAllowedFailures(int materializationMaxAllowedFailures) {
-        this.materializationMaxAllowedFailures = materializationMaxAllowedFailures;
+        configuration.set(
+                StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+                materializationMaxAllowedFailures);
     }
 
     /**
@@ -338,7 +326,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      *     used.
      */
     public int getParallelism() {
-        return parallelism;
+        return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
     }
 
     /**
@@ -359,7 +347,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 throw new IllegalArgumentException(
                         "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
             }
-            this.parallelism = parallelism;
+            configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
         }
         return this;
     }
@@ -374,7 +362,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public int getMaxParallelism() {
-        return maxParallelism;
+        return configuration.get(PipelineOptions.MAX_PARALLELISM);
     }
 
     /**
@@ -388,7 +376,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public void setMaxParallelism(int maxParallelism) {
         checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
-        this.maxParallelism = maxParallelism;
+        configuration.set(PipelineOptions.MAX_PARALLELISM, maxParallelism);
     }
 
     /**
@@ -503,7 +491,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @Deprecated
     public int getNumberOfExecutionRetries() {
-        return numberOfExecutionRetries;
+        return configuration.get(EXECUTION_RETRIES);
     }
 
     /**
@@ -535,7 +523,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
             throw new IllegalArgumentException(
                     "The number of execution retries must be non-negative, or -1 (use system default)");
         }
-        this.numberOfExecutionRetries = numberOfExecutionRetries;
+        configuration.set(EXECUTION_RETRIES, numberOfExecutionRetries);
         return this;
     }
 
@@ -566,7 +554,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @param executionMode The execution mode to use.
      */
     public void setExecutionMode(ExecutionMode executionMode) {
-        this.executionMode = executionMode;
+        configuration.set(EXECUTION_MODE, executionMode);
     }
 
     /**
@@ -578,7 +566,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @return The execution mode for the program.
      */
     public ExecutionMode getExecutionMode() {
-        return executionMode;
+        return configuration.get(EXECUTION_MODE);
     }
 
     /**
@@ -613,16 +601,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * cannot be analyzed as POJO.
      */
     public void enableForceKryo() {
-        forceKryo = true;
+        setForceKryo(true);
     }
 
     /** Disable use of Kryo serializer for all POJOs. */
     public void disableForceKryo() {
-        forceKryo = false;
+        setForceKryo(false);
+    }
+
+    private void setForceKryo(boolean forceKryo) {
+        configuration.set(PipelineOptions.FORCE_KRYO, forceKryo);
     }
 
     public boolean isForceKryoEnabled() {
-        return forceKryo;
+        return configuration.get(PipelineOptions.FORCE_KRYO);
     }
 
     /**
@@ -633,7 +625,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableGenericTypes()
      */
     public void enableGenericTypes() {
-        disableGenericTypes = false;
+        setGenericTypes(true);
     }
 
     /**
@@ -653,7 +645,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #enableGenericTypes()
      */
     public void disableGenericTypes() {
-        disableGenericTypes = true;
+        setGenericTypes(false);
+    }
+
+    private void setGenericTypes(boolean genericTypes) {
+        configuration.set(PipelineOptions.GENERIC_TYPES, genericTypes);
     }
 
     /**
@@ -666,7 +662,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableGenericTypes()
      */
     public boolean hasGenericTypesDisabled() {
-        return disableGenericTypes;
+        return !configuration.get(PipelineOptions.GENERIC_TYPES);
     }
 
     /**
@@ -675,7 +671,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableAutoGeneratedUIDs()
      */
     public void enableAutoGeneratedUIDs() {
-        enableAutoGeneratedUids = true;
+        setAutoGeneratedUids(true);
     }
 
     /**
@@ -688,7 +684,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * overtime without discarding state.
      */
     public void disableAutoGeneratedUIDs() {
-        enableAutoGeneratedUids = false;
+        setAutoGeneratedUids(false);
+    }
+
+    private void setAutoGeneratedUids(boolean autoGeneratedUids) {
+        configuration.set(PipelineOptions.AUTO_GENERATE_UIDS, autoGeneratedUids);
     }
 
     /**
@@ -700,7 +700,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @see #disableAutoGeneratedUIDs()
      */
     public boolean hasAutoGeneratedUIDsEnabled() {
-        return enableAutoGeneratedUids;
+        return configuration.get(PipelineOptions.AUTO_GENERATE_UIDS);
     }
 
     /**
@@ -709,17 +709,21 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * <p><b>Important:</b> Make sure to include the <i>flink-avro</i> module.
      */
     public void enableForceAvro() {
-        forceAvro = true;
+        setForceAvro(true);
     }
 
     /** Disables the Apache Avro serializer as the forced serializer for POJOs. */
     public void disableForceAvro() {
-        forceAvro = false;
+        setForceAvro(false);
+    }
+
+    private void setForceAvro(boolean forceAvro) {
+        configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
     }
 
     /** Returns whether the Apache Avro is the default serializer for POJOs. */
     public boolean isForceAvroEnabled() {
-        return forceAvro;
+        return configuration.get(PipelineOptions.FORCE_AVRO);
     }
 
     /**
@@ -728,8 +732,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * an operation is not aware of this behaviour.
      */
     public ExecutionConfig enableObjectReuse() {
-        objectReuse = true;
-        return this;
+        return setObjectReuse(true);
     }
 
     /**
@@ -737,13 +740,17 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * user-code functions. @see #enableObjectReuse()
      */
     public ExecutionConfig disableObjectReuse() {
-        objectReuse = false;
+        return setObjectReuse(false);
+    }
+
+    private ExecutionConfig setObjectReuse(boolean objectReuse) {
+        configuration.set(PipelineOptions.OBJECT_REUSE, objectReuse);
         return this;
     }
 
     /** Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse() */
     public boolean isObjectReuseEnabled() {
-        return objectReuse;
+        return configuration.get(PipelineOptions.OBJECT_REUSE);
     }
 
     public GlobalJobParameters getGlobalJobParameters() {
@@ -915,22 +922,26 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     }
 
     public boolean isAutoTypeRegistrationDisabled() {
-        return !autoTypeRegistrationEnabled;
+        return !configuration.get(PipelineOptions.AUTO_TYPE_REGISTRATION);
     }
 
     /**
      * Control whether Flink is automatically registering all types in the user programs with Kryo.
      */
     public void disableAutoTypeRegistration() {
-        this.autoTypeRegistrationEnabled = false;
+        setAutoTypeRegistration(false);
+    }
+
+    private void setAutoTypeRegistration(Boolean autoTypeRegistration) {
+        configuration.set(PipelineOptions.AUTO_TYPE_REGISTRATION, autoTypeRegistration);
     }
 
     public boolean isUseSnapshotCompression() {
-        return useSnapshotCompression;
+        return configuration.get(ExecutionOptions.SNAPSHOT_COMPRESSION);
     }
 
     public void setUseSnapshotCompression(boolean useSnapshotCompression) {
-        this.useSnapshotCompression = useSnapshotCompression;
+        configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
     @Override
@@ -939,28 +950,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
             ExecutionConfig other = (ExecutionConfig) obj;
 
             return other.canEqual(this)
-                    && Objects.equals(executionMode, other.executionMode)
-                    && closureCleanerLevel == other.closureCleanerLevel
-                    && parallelism == other.parallelism
+                    && Objects.equals(configuration, other.configuration)
                     && ((restartStrategyConfiguration == null
                                     && other.restartStrategyConfiguration == null)
                             || (null != restartStrategyConfiguration
                                     && restartStrategyConfiguration.equals(
                                             other.restartStrategyConfiguration)))
-                    && forceKryo == other.forceKryo
-                    && disableGenericTypes == other.disableGenericTypes
-                    && objectReuse == other.objectReuse
-                    && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled
-                    && forceAvro == other.forceAvro
                     && Objects.equals(globalJobParameters, other.globalJobParameters)
-                    && autoWatermarkInterval == other.autoWatermarkInterval
                     && registeredTypesWithKryoSerializerClasses.equals(
                             other.registeredTypesWithKryoSerializerClasses)
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
                     && registeredPojoTypes.equals(other.registeredPojoTypes)
                     && taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
-                    && useSnapshotCompression == other.useSnapshotCompression
                     && isDynamicGraph == other.isDynamicGraph;
 
         } else {
@@ -971,57 +973,22 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @Override
     public int hashCode() {
         return Objects.hash(
-                executionMode,
-                closureCleanerLevel,
-                parallelism,
+                configuration,
                 restartStrategyConfiguration,
-                forceKryo,
-                disableGenericTypes,
-                objectReuse,
-                autoTypeRegistrationEnabled,
-                forceAvro,
                 globalJobParameters,
-                autoWatermarkInterval,
                 registeredTypesWithKryoSerializerClasses,
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
                 registeredPojoTypes,
                 taskCancellationIntervalMillis,
-                useSnapshotCompression,
                 isDynamicGraph);
     }
 
     @Override
     public String toString() {
         return "ExecutionConfig{"
-                + "executionMode="
-                + executionMode
-                + ", closureCleanerLevel="
-                + closureCleanerLevel
-                + ", parallelism="
-                + parallelism
-                + ", maxParallelism="
-                + maxParallelism
-                + ", numberOfExecutionRetries="
-                + numberOfExecutionRetries
-                + ", forceKryo="
-                + forceKryo
-                + ", disableGenericTypes="
-                + disableGenericTypes
-                + ", enableAutoGeneratedUids="
-                + enableAutoGeneratedUids
-                + ", objectReuse="
-                + objectReuse
-                + ", autoTypeRegistrationEnabled="
-                + autoTypeRegistrationEnabled
-                + ", forceAvro="
-                + forceAvro
-                + ", autoWatermarkInterval="
-                + autoWatermarkInterval
-                + ", latencyTrackingInterval="
-                + latencyTrackingInterval
-                + ", isLatencyTrackingConfigured="
-                + isLatencyTrackingConfigured
+                + "configuration="
+                + configuration
                 + ", executionRetryDelay="
                 + executionRetryDelay
                 + ", restartStrategyConfiguration="
@@ -1030,8 +997,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + taskCancellationIntervalMillis
                 + ", taskCancellationTimeoutMillis="
                 + taskCancellationTimeoutMillis
-                + ", useSnapshotCompression="
-                + useSnapshotCompression
                 + ", globalJobParameters="
                 + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
@@ -1145,21 +1110,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     public void configure(ReadableConfig configuration, ClassLoader classLoader) {
         configuration
                 .getOptional(PipelineOptions.AUTO_TYPE_REGISTRATION)
-                .ifPresent(b -> this.autoTypeRegistrationEnabled = b);
+                .ifPresent(this::setAutoTypeRegistration);
         configuration
                 .getOptional(PipelineOptions.AUTO_GENERATE_UIDS)
-                .ifPresent(b -> this.enableAutoGeneratedUids = b);
+                .ifPresent(this::setAutoGeneratedUids);
         configuration
                 .getOptional(PipelineOptions.AUTO_WATERMARK_INTERVAL)
-                .ifPresent(i -> this.setAutoWatermarkInterval(i.toMillis()));
+                .ifPresent(this::setAutoWatermarkInterval);
         configuration
                 .getOptional(PipelineOptions.CLOSURE_CLEANER_LEVEL)
                 .ifPresent(this::setClosureCleanerLevel);
-        configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(b -> this.forceAvro = b);
-        configuration
-                .getOptional(PipelineOptions.GENERIC_TYPES)
-                .ifPresent(b -> this.disableGenericTypes = !b);
-        configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(b -> this.forceKryo = b);
+        configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(this::setForceAvro);
+        configuration.getOptional(PipelineOptions.GENERIC_TYPES).ifPresent(this::setGenericTypes);
+        configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo);
         configuration
                 .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
                 .<GlobalJobParameters>map(MapBasedJobParameters::new)
@@ -1180,9 +1143,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 .getOptional(PipelineOptions.MAX_PARALLELISM)
                 .ifPresent(this::setMaxParallelism);
         configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
-        configuration
-                .getOptional(PipelineOptions.OBJECT_REUSE)
-                .ifPresent(o -> this.objectReuse = o);
+        configuration.getOptional(PipelineOptions.OBJECT_REUSE).ifPresent(this::setObjectReuse);
         configuration
                 .getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
                 .ifPresent(this::setTaskCancellationInterval);
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index c65c0377b41..5c8917f0210 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -267,7 +267,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         self.assertNotEqual(config1, config2)
 
-        self.assertNotEqual(hash(config1), hash(config2))
+        # it is allowed for hashes to be equal even if objects are not
 
         config2.set_parallelism(12)