You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:47 UTC
[flink] 02/04: [FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c319340e9728bd60565c15922add220dc233d921
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Oct 13 16:30:20 2022 +0200
[FLINK-29379][streaming] Migrate most ExecutionConfig fields to Configuration map
---
.../apache/flink/api/common/ExecutionConfig.java | 269 +++++++++------------
.../pyflink/common/tests/test_execution_config.py | 2 +-
2 files changed, 116 insertions(+), 155 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 1c28d2b7091..461cdb4e107 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DescribedEnum;
@@ -47,6 +49,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -104,57 +107,40 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private static final long DEFAULT_RESTART_DELAY = 10000L;
- // --------------------------------------------------------------------------------------------
-
- /** Defines how data exchange happens - batch or pipelined */
- private ExecutionMode executionMode = ExecutionMode.PIPELINED;
-
- private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
-
- private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
-
/**
- * The program wide maximum parallelism used for operators which haven't specified a maximum
- * parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
- * number of key groups used for partitioned state.
+ * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+ * via config files. We are defining them here, so that we can store them in the {@link
+ * #configuration}.
+ *
+ * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+ * key, type and descriptions are sensible, as the initial values are arbitrary.
*/
- private int maxParallelism = -1;
+ // --------------------------------------------------------------------------------------------
+
+ private static final ConfigOption<ExecutionMode> EXECUTION_MODE =
+ key("hidden.execution.mode")
+ .enumType(ExecutionMode.class)
+ .defaultValue(ExecutionMode.PIPELINED)
+ .withDescription("Defines how data exchange happens - batch or pipelined");
/**
- * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
+ * Use {@link
+ * org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}
*/
- @Deprecated private int numberOfExecutionRetries = -1;
-
- private boolean forceKryo = false;
-
- /** Flag to indicate whether generic types (through Kryo) are supported */
- private boolean disableGenericTypes = false;
-
- private boolean enableAutoGeneratedUids = true;
-
- private boolean objectReuse = false;
-
- private boolean autoTypeRegistrationEnabled = true;
-
- private boolean forceAvro = false;
- private long autoWatermarkInterval =
- PipelineOptions.AUTO_WATERMARK_INTERVAL.defaultValue().toMillis();
+ @Deprecated
+ private static final ConfigOption<Integer> EXECUTION_RETRIES =
+ key("hidden.execution.retries")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Should no longer be used because it is subsumed by RestartStrategyConfiguration");
+ // --------------------------------------------------------------------------------------------
- // ---------- statebackend related configurations ------------------------------
/**
- * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
+ * In the long run, this field should be somehow merged with the {@link Configuration} from
+ * StreamExecutionEnvironment.
*/
- private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
-
- private boolean isLatencyTrackingConfigured = false;
-
- /** Interval in milliseconds to perform periodic changelog materialization. */
- private long periodicMaterializeIntervalMillis =
- StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue().toMillis();
-
- /** Max allowed number of consecutive failures for changelog materialization */
- private int materializationMaxAllowedFailures =
- StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED.defaultValue();
+ private final Configuration configuration = new Configuration();
/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -174,11 +160,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
private long taskCancellationTimeoutMillis = -1;
- /**
- * This flag defines if we use compression for the state snapshot data or not. Default: false
- */
- private boolean useSnapshotCompression = false;
-
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -212,8 +193,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* be serializable because it needs to be sent to worker nodes.
*/
public ExecutionConfig enableClosureCleaner() {
- this.closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
- return this;
+ return setClosureCleanerLevel(ClosureCleanerLevel.RECURSIVE);
}
/**
@@ -222,8 +202,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #enableClosureCleaner()
*/
public ExecutionConfig disableClosureCleaner() {
- this.closureCleanerLevel = ClosureCleanerLevel.NONE;
- return this;
+ return setClosureCleanerLevel(ClosureCleanerLevel.NONE);
}
/**
@@ -232,7 +211,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #enableClosureCleaner()
*/
public boolean isClosureCleanerEnabled() {
- return !(closureCleanerLevel == ClosureCleanerLevel.NONE);
+ return !(getClosureCleanerLevel() == ClosureCleanerLevel.NONE);
}
/**
@@ -240,13 +219,13 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* different settings.
*/
public ExecutionConfig setClosureCleanerLevel(ClosureCleanerLevel level) {
- this.closureCleanerLevel = level;
+ configuration.set(PipelineOptions.CLOSURE_CLEANER_LEVEL, level);
return this;
}
/** Returns the configured {@link ClosureCleanerLevel}. */
public ClosureCleanerLevel getClosureCleanerLevel() {
- return closureCleanerLevel;
+ return configuration.get(PipelineOptions.CLOSURE_CLEANER_LEVEL);
}
/**
@@ -261,7 +240,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@PublicEvolving
public ExecutionConfig setAutoWatermarkInterval(long interval) {
Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
- this.autoWatermarkInterval = interval;
+ return setAutoWatermarkInterval(Duration.ofMillis(interval));
+ }
+
+ private ExecutionConfig setAutoWatermarkInterval(Duration autoWatermarkInterval) {
+ configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, autoWatermarkInterval);
return this;
}
@@ -272,7 +255,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public long getAutoWatermarkInterval() {
- return this.autoWatermarkInterval;
+ return configuration.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
}
/**
@@ -285,8 +268,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public ExecutionConfig setLatencyTrackingInterval(long interval) {
- this.latencyTrackingInterval = interval;
- this.isLatencyTrackingConfigured = true;
+ configuration.set(MetricOptions.LATENCY_INTERVAL, interval);
return this;
}
@@ -297,32 +279,38 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public long getLatencyTrackingInterval() {
- return latencyTrackingInterval;
+ return configuration.get(MetricOptions.LATENCY_INTERVAL);
}
@Internal
public boolean isLatencyTrackingConfigured() {
- return isLatencyTrackingConfigured;
+ return configuration.getOptional(MetricOptions.LATENCY_INTERVAL).isPresent();
}
@Internal
public long getPeriodicMaterializeIntervalMillis() {
- return periodicMaterializeIntervalMillis;
+ return configuration
+ .get(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)
+ .toMillis();
}
@Internal
public void setPeriodicMaterializeIntervalMillis(Duration periodicMaterializeInterval) {
- this.periodicMaterializeIntervalMillis = periodicMaterializeInterval.toMillis();
+ configuration.set(
+ StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+ periodicMaterializeInterval);
}
@Internal
public int getMaterializationMaxAllowedFailures() {
- return materializationMaxAllowedFailures;
+ return configuration.get(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED);
}
@Internal
public void setMaterializationMaxAllowedFailures(int materializationMaxAllowedFailures) {
- this.materializationMaxAllowedFailures = materializationMaxAllowedFailures;
+ configuration.set(
+ StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+ materializationMaxAllowedFailures);
}
/**
@@ -338,7 +326,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* used.
*/
public int getParallelism() {
- return parallelism;
+ return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
}
/**
@@ -359,7 +347,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
throw new IllegalArgumentException(
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
- this.parallelism = parallelism;
+ configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
}
return this;
}
@@ -374,7 +362,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public int getMaxParallelism() {
- return maxParallelism;
+ return configuration.get(PipelineOptions.MAX_PARALLELISM);
}
/**
@@ -388,7 +376,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
- this.maxParallelism = maxParallelism;
+ configuration.set(PipelineOptions.MAX_PARALLELISM, maxParallelism);
}
/**
@@ -503,7 +491,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@Deprecated
public int getNumberOfExecutionRetries() {
- return numberOfExecutionRetries;
+ return configuration.get(EXECUTION_RETRIES);
}
/**
@@ -535,7 +523,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
throw new IllegalArgumentException(
"The number of execution retries must be non-negative, or -1 (use system default)");
}
- this.numberOfExecutionRetries = numberOfExecutionRetries;
+ configuration.set(EXECUTION_RETRIES, numberOfExecutionRetries);
return this;
}
@@ -566,7 +554,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @param executionMode The execution mode to use.
*/
public void setExecutionMode(ExecutionMode executionMode) {
- this.executionMode = executionMode;
+ configuration.set(EXECUTION_MODE, executionMode);
}
/**
@@ -578,7 +566,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @return The execution mode for the program.
*/
public ExecutionMode getExecutionMode() {
- return executionMode;
+ return configuration.get(EXECUTION_MODE);
}
/**
@@ -613,16 +601,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* cannot be analyzed as POJO.
*/
public void enableForceKryo() {
- forceKryo = true;
+ setForceKryo(true);
}
/** Disable use of Kryo serializer for all POJOs. */
public void disableForceKryo() {
- forceKryo = false;
+ setForceKryo(false);
+ }
+
+ private void setForceKryo(boolean forceKryo) {
+ configuration.set(PipelineOptions.FORCE_KRYO, forceKryo);
}
public boolean isForceKryoEnabled() {
- return forceKryo;
+ return configuration.get(PipelineOptions.FORCE_KRYO);
}
/**
@@ -633,7 +625,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #disableGenericTypes()
*/
public void enableGenericTypes() {
- disableGenericTypes = false;
+ setGenericTypes(true);
}
/**
@@ -653,7 +645,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #enableGenericTypes()
*/
public void disableGenericTypes() {
- disableGenericTypes = true;
+ setGenericTypes(false);
+ }
+
+ private void setGenericTypes(boolean genericTypes) {
+ configuration.set(PipelineOptions.GENERIC_TYPES, genericTypes);
}
/**
@@ -666,7 +662,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #disableGenericTypes()
*/
public boolean hasGenericTypesDisabled() {
- return disableGenericTypes;
+ return !configuration.get(PipelineOptions.GENERIC_TYPES);
}
/**
@@ -675,7 +671,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #disableAutoGeneratedUIDs()
*/
public void enableAutoGeneratedUIDs() {
- enableAutoGeneratedUids = true;
+ setAutoGeneratedUids(true);
}
/**
@@ -688,7 +684,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* overtime without discarding state.
*/
public void disableAutoGeneratedUIDs() {
- enableAutoGeneratedUids = false;
+ setAutoGeneratedUids(false);
+ }
+
+ private void setAutoGeneratedUids(boolean autoGeneratedUids) {
+ configuration.set(PipelineOptions.AUTO_GENERATE_UIDS, autoGeneratedUids);
}
/**
@@ -700,7 +700,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @see #disableAutoGeneratedUIDs()
*/
public boolean hasAutoGeneratedUIDsEnabled() {
- return enableAutoGeneratedUids;
+ return configuration.get(PipelineOptions.AUTO_GENERATE_UIDS);
}
/**
@@ -709,17 +709,21 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* <p><b>Important:</b> Make sure to include the <i>flink-avro</i> module.
*/
public void enableForceAvro() {
- forceAvro = true;
+ setForceAvro(true);
}
/** Disables the Apache Avro serializer as the forced serializer for POJOs. */
public void disableForceAvro() {
- forceAvro = false;
+ setForceAvro(false);
+ }
+
+ private void setForceAvro(boolean forceAvro) {
+ configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
}
/** Returns whether the Apache Avro is the default serializer for POJOs. */
public boolean isForceAvroEnabled() {
- return forceAvro;
+ return configuration.get(PipelineOptions.FORCE_AVRO);
}
/**
@@ -728,8 +732,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* an operation is not aware of this behaviour.
*/
public ExecutionConfig enableObjectReuse() {
- objectReuse = true;
- return this;
+ return setObjectReuse(true);
}
/**
@@ -737,13 +740,17 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* user-code functions. @see #enableObjectReuse()
*/
public ExecutionConfig disableObjectReuse() {
- objectReuse = false;
+ return setObjectReuse(false);
+ }
+
+ private ExecutionConfig setObjectReuse(boolean objectReuse) {
+ configuration.set(PipelineOptions.OBJECT_REUSE, objectReuse);
return this;
}
/** Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse() */
public boolean isObjectReuseEnabled() {
- return objectReuse;
+ return configuration.get(PipelineOptions.OBJECT_REUSE);
}
public GlobalJobParameters getGlobalJobParameters() {
@@ -915,22 +922,26 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
public boolean isAutoTypeRegistrationDisabled() {
- return !autoTypeRegistrationEnabled;
+ return !configuration.get(PipelineOptions.AUTO_TYPE_REGISTRATION);
}
/**
* Control whether Flink is automatically registering all types in the user programs with Kryo.
*/
public void disableAutoTypeRegistration() {
- this.autoTypeRegistrationEnabled = false;
+ setAutoTypeRegistration(false);
+ }
+
+ private void setAutoTypeRegistration(Boolean autoTypeRegistration) {
+ configuration.set(PipelineOptions.AUTO_TYPE_REGISTRATION, autoTypeRegistration);
}
public boolean isUseSnapshotCompression() {
- return useSnapshotCompression;
+ return configuration.get(ExecutionOptions.SNAPSHOT_COMPRESSION);
}
public void setUseSnapshotCompression(boolean useSnapshotCompression) {
- this.useSnapshotCompression = useSnapshotCompression;
+ configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
}
@Override
@@ -939,28 +950,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
ExecutionConfig other = (ExecutionConfig) obj;
return other.canEqual(this)
- && Objects.equals(executionMode, other.executionMode)
- && closureCleanerLevel == other.closureCleanerLevel
- && parallelism == other.parallelism
+ && Objects.equals(configuration, other.configuration)
&& ((restartStrategyConfiguration == null
&& other.restartStrategyConfiguration == null)
|| (null != restartStrategyConfiguration
&& restartStrategyConfiguration.equals(
other.restartStrategyConfiguration)))
- && forceKryo == other.forceKryo
- && disableGenericTypes == other.disableGenericTypes
- && objectReuse == other.objectReuse
- && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled
- && forceAvro == other.forceAvro
&& Objects.equals(globalJobParameters, other.globalJobParameters)
- && autoWatermarkInterval == other.autoWatermarkInterval
&& registeredTypesWithKryoSerializerClasses.equals(
other.registeredTypesWithKryoSerializerClasses)
&& defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
&& registeredKryoTypes.equals(other.registeredKryoTypes)
&& registeredPojoTypes.equals(other.registeredPojoTypes)
&& taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
- && useSnapshotCompression == other.useSnapshotCompression
&& isDynamicGraph == other.isDynamicGraph;
} else {
@@ -971,57 +973,22 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@Override
public int hashCode() {
return Objects.hash(
- executionMode,
- closureCleanerLevel,
- parallelism,
+ configuration,
restartStrategyConfiguration,
- forceKryo,
- disableGenericTypes,
- objectReuse,
- autoTypeRegistrationEnabled,
- forceAvro,
globalJobParameters,
- autoWatermarkInterval,
registeredTypesWithKryoSerializerClasses,
defaultKryoSerializerClasses,
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
- useSnapshotCompression,
isDynamicGraph);
}
@Override
public String toString() {
return "ExecutionConfig{"
- + "executionMode="
- + executionMode
- + ", closureCleanerLevel="
- + closureCleanerLevel
- + ", parallelism="
- + parallelism
- + ", maxParallelism="
- + maxParallelism
- + ", numberOfExecutionRetries="
- + numberOfExecutionRetries
- + ", forceKryo="
- + forceKryo
- + ", disableGenericTypes="
- + disableGenericTypes
- + ", enableAutoGeneratedUids="
- + enableAutoGeneratedUids
- + ", objectReuse="
- + objectReuse
- + ", autoTypeRegistrationEnabled="
- + autoTypeRegistrationEnabled
- + ", forceAvro="
- + forceAvro
- + ", autoWatermarkInterval="
- + autoWatermarkInterval
- + ", latencyTrackingInterval="
- + latencyTrackingInterval
- + ", isLatencyTrackingConfigured="
- + isLatencyTrackingConfigured
+ + "configuration="
+ + configuration
+ ", executionRetryDelay="
+ executionRetryDelay
+ ", restartStrategyConfiguration="
@@ -1030,8 +997,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
+ taskCancellationIntervalMillis
+ ", taskCancellationTimeoutMillis="
+ taskCancellationTimeoutMillis
- + ", useSnapshotCompression="
- + useSnapshotCompression
+ ", globalJobParameters="
+ globalJobParameters
+ ", registeredTypesWithKryoSerializers="
@@ -1145,21 +1110,19 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(PipelineOptions.AUTO_TYPE_REGISTRATION)
- .ifPresent(b -> this.autoTypeRegistrationEnabled = b);
+ .ifPresent(this::setAutoTypeRegistration);
configuration
.getOptional(PipelineOptions.AUTO_GENERATE_UIDS)
- .ifPresent(b -> this.enableAutoGeneratedUids = b);
+ .ifPresent(this::setAutoGeneratedUids);
configuration
.getOptional(PipelineOptions.AUTO_WATERMARK_INTERVAL)
- .ifPresent(i -> this.setAutoWatermarkInterval(i.toMillis()));
+ .ifPresent(this::setAutoWatermarkInterval);
configuration
.getOptional(PipelineOptions.CLOSURE_CLEANER_LEVEL)
.ifPresent(this::setClosureCleanerLevel);
- configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(b -> this.forceAvro = b);
- configuration
- .getOptional(PipelineOptions.GENERIC_TYPES)
- .ifPresent(b -> this.disableGenericTypes = !b);
- configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(b -> this.forceKryo = b);
+ configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(this::setForceAvro);
+ configuration.getOptional(PipelineOptions.GENERIC_TYPES).ifPresent(this::setGenericTypes);
+ configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo);
configuration
.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
.<GlobalJobParameters>map(MapBasedJobParameters::new)
@@ -1180,9 +1143,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
.getOptional(PipelineOptions.MAX_PARALLELISM)
.ifPresent(this::setMaxParallelism);
configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
- configuration
- .getOptional(PipelineOptions.OBJECT_REUSE)
- .ifPresent(o -> this.objectReuse = o);
+ configuration.getOptional(PipelineOptions.OBJECT_REUSE).ifPresent(this::setObjectReuse);
configuration
.getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
.ifPresent(this::setTaskCancellationInterval);
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index c65c0377b41..5c8917f0210 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -267,7 +267,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
self.assertNotEqual(config1, config2)
- self.assertNotEqual(hash(config1), hash(config2))
+ # it is allowed for hashes to be equal even if objects are not
config2.set_parallelism(12)