You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/08 13:20:18 UTC

[flink] branch master updated (bb124f4ada4 -> 488db256914)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from bb124f4ada4 [FLINK-29379][streaming] Drop CheckpointConfig#failOnCheckpointingErrors and use getTolerableCheckpointFailureNumber() instead
     new 69526c56a10 [hotfix][streaming] Unify method names in ConfigurationNotAllowedMessage
     new 35b2cae8cdb [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig
     new 488db256914 [FLINK-29888][streaming] Validate CheckpointConfig#storage manualy

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/deployment_configuration.html        |   2 +-
 .../client/program/StreamContextEnvironment.java   | 180 +++++++++------------
 .../program/StreamContextEnvironmentTest.java      | 123 +++++++++++---
 .../apache/flink/configuration/Configuration.java  |  14 ++
 .../configuration/DelegatingConfiguration.java     |   5 +
 .../flink/configuration/DeploymentOptions.java     |   4 +-
 .../flink/configuration/ConfigurationTest.java     |  20 +++
 .../dispatcher/ConfigurationNotAllowedMessage.java |  41 +++--
 .../storage/ExternalizedSnapshotLocation.java      |  19 +++
 .../state/storage/FileSystemCheckpointStorage.java |  20 +++
 10 files changed, 284 insertions(+), 144 deletions(-)


[flink] 03/03: [FLINK-29888][streaming] Validate CheckpointConfig#storage manualy

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 488db2569147d1bb2fbefb95f351239f930dd0d0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Nov 7 13:24:43 2022 +0100

    [FLINK-29888][streaming] Validate CheckpointConfig#storage manualy
    
    Unfortunately, CheckpointConfig#setCheckpointStorage is not backed by a
    Configuration, but it also has to be validated. For this validation we
    are implementing a one off manual check
---
 .../client/program/StreamContextEnvironment.java   |  16 +++
 .../program/StreamContextEnvironmentTest.java      | 109 +++++++++++++++++----
 .../dispatcher/ConfigurationNotAllowedMessage.java |   5 +
 .../storage/ExternalizedSnapshotLocation.java      |  19 ++++
 .../state/storage/FileSystemCheckpointStorage.java |  20 ++++
 5 files changed, 150 insertions(+), 19 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 15f7022086a..efe5ada5912 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
@@ -47,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -294,6 +296,20 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                 checkpointCfg.toConfiguration(),
                 checkpointCfg.getClass().getSimpleName(),
                 errors);
+
+        /**
+         * Unfortunately, {@link CheckpointConfig#setCheckpointStorage} is not backed by a {@link
+         * Configuration}, but it also has to be validated. For this validation we are implementing
+         * a one off manual check.
+         */
+        if (!programConfigWildcards.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())
+                && !Objects.equals(
+                        checkpointCfg.getCheckpointStorage(),
+                        expectedCheckpointConfig.getCheckpointStorage())) {
+            errors.add(
+                    ConfigurationNotAllowedMessage.ofConfigurationObjectSetterUsed(
+                            checkpointCfg.getClass().getSimpleName(), "setCheckpointStorage"));
+        }
     }
 
     private void checkExecutionConfig(Configuration clusterConfigMap, List<String> errors) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index f9432e048be..0d1b29c6358 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -27,6 +28,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -37,8 +39,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
@@ -64,17 +66,8 @@ class StreamContextEnvironmentTest {
         programConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
         programConfig.set(ExecutionOptions.SORT_INPUTS, true);
 
-        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         final StreamContextEnvironment environment =
-                new StreamContextEnvironment(
-                        new MockExecutorServiceLoader(),
-                        clusterConfig,
-                        clusterConfig,
-                        classLoader,
-                        true,
-                        true,
-                        false,
-                        new ArrayList<>());
+                constructStreamContextEnvironment(clusterConfig, Collections.emptyList());
 
         // Change the CheckpointConfig
         environment.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
@@ -97,6 +90,77 @@ class StreamContextEnvironmentTest {
                         PipelineOptions.MAX_PARALLELISM.key());
     }
 
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testDisallowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, Collections.emptyList());
+
+        String disallowedPath = "file:///flink/disallowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), "setCheckpointStorage");
+
+        environment.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), "setCheckpointStorage");
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testAllowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(
+                        clusterConfig,
+                        Arrays.asList(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
+
+        String allowedPath = "file:///flink/allowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testNotModifiedCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, Collections.emptyList());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
     @ParameterizedTest
     @MethodSource("provideExecutors")
     void testAllowProgramConfigurationWildcards(
@@ -114,16 +178,9 @@ class StreamContextEnvironmentTest {
                 PipelineOptions.GLOBAL_JOB_PARAMETERS,
                 Collections.singletonMap("my-other-param", "my-other-value"));
 
-        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         final StreamContextEnvironment environment =
-                new StreamContextEnvironment(
-                        new MockExecutorServiceLoader(),
-                        clusterConfig,
+                constructStreamContextEnvironment(
                         clusterConfig,
-                        classLoader,
-                        true,
-                        true,
-                        false,
                         Arrays.asList(
                                 PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
                                 PipelineOptions.MAX_PARALLELISM.key()));
@@ -139,6 +196,20 @@ class StreamContextEnvironmentTest {
                 .containsOnlyKeys("my-other-param");
     }
 
+    private static StreamContextEnvironment constructStreamContextEnvironment(
+            Configuration clusterConfig, Collection<String> programConfigWildcards) {
+        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        return new StreamContextEnvironment(
+                new MockExecutorServiceLoader(),
+                clusterConfig,
+                clusterConfig,
+                classLoader,
+                true,
+                true,
+                false,
+                programConfigWildcards);
+    }
+
     private static List<ThrowingConsumer<StreamExecutionEnvironment, Exception>>
             provideExecutors() {
         return Arrays.asList(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index 23299bc059e..0d5e36a16f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -66,4 +66,9 @@ public class ConfigurationNotAllowedMessage {
                 "Configuration %s:%s was removed from the configuration object %s.",
                 configKey, configValue, configurationObject);
     }
+
+    public static String ofConfigurationObjectSetterUsed(
+            String configurationObject, String setter) {
+        return String.format("Setter %s#%s has been used", configurationObject, setter);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/ExternalizedSnapshotLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/ExternalizedSnapshotLocation.java
index 1dcabd78dec..ddb1c1fc097 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/ExternalizedSnapshotLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/ExternalizedSnapshotLocation.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -154,4 +155,22 @@ class ExternalizedSnapshotLocation implements Serializable {
                             }
                         });
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(baseCheckpointPath, baseSavepointPath);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+        ExternalizedSnapshotLocation that = (ExternalizedSnapshotLocation) other;
+        return Objects.equals(baseCheckpointPath, that.baseCheckpointPath)
+                && Objects.equals(baseSavepointPath, that.baseSavepointPath);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java
index 4f0e0246d42..486744c11cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Objects;
 
 import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -371,4 +372,23 @@ public class FileSystemCheckpointStorage
                 ? writeBufferSize
                 : CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue();
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(location, fileStateThreshold, writeBufferSize);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+        FileSystemCheckpointStorage that = (FileSystemCheckpointStorage) other;
+        return Objects.equals(location, that.location)
+                && Objects.equals(fileStateThreshold, that.fileStateThreshold)
+                && Objects.equals(writeBufferSize, that.writeBufferSize);
+    }
 }


[flink] 01/03: [hotfix][streaming] Unify method names in ConfigurationNotAllowedMessage

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69526c56a10b836e2ca28443398a8e5ea1411855
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 16:38:47 2022 +0100

    [hotfix][streaming] Unify method names in ConfigurationNotAllowedMessage
---
 .../flink/client/program/StreamContextEnvironment.java     |  5 ++---
 .../runtime/dispatcher/ConfigurationNotAllowedMessage.java | 14 +++++++-------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 262b4821f05..587e6c6d6dd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -295,8 +295,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                 .forEach(
                         (k, v) ->
                                 errors.add(
-                                        ConfigurationNotAllowedMessage.ofConfigurationKeyAndValue(
-                                                k, v)));
+                                        ConfigurationNotAllowedMessage.ofConfigurationAdded(k, v)));
         diff.entriesOnlyOnLeft()
                 .forEach(
                         (k, v) ->
@@ -307,7 +306,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                 .forEach(
                         (k, v) ->
                                 errors.add(
-                                        ConfigurationNotAllowedMessage.ofConfigurationChange(
+                                        ConfigurationNotAllowedMessage.ofConfigurationChanged(
                                                 k, v)));
 
         final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index fd01c52d004..1e3d237ce9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -32,19 +32,19 @@ public class ConfigurationNotAllowedMessage {
 
     private ConfigurationNotAllowedMessage() {}
 
-    public static String ofConfigurationKeyAndValue(String configkey, String configValue) {
-        return String.format("Configuration %s:%s not allowed.", configkey, configValue);
+    public static String ofConfigurationAdded(String configKey, String configValue) {
+        return String.format("Configuration %s:%s not allowed.", configKey, configValue);
     }
 
-    public static String ofConfigurationRemoved(String configkey, String configValue) {
-        return String.format("Configuration %s:%s was removed.", configkey, configValue);
+    public static String ofConfigurationRemoved(String configKey, String configValue) {
+        return String.format("Configuration %s:%s was removed.", configKey, configValue);
     }
 
-    public static String ofConfigurationChange(
-            String configkey, MapDifference.ValueDifference<String> change) {
+    public static String ofConfigurationChanged(
+            String configKey, MapDifference.ValueDifference<String> change) {
         return String.format(
                 "Configuration %s was changed from %s to %s.",
-                configkey, change.leftValue(), change.rightValue());
+                configKey, change.leftValue(), change.rightValue());
     }
 
     public static String ofConfigurationObject(String configurationObject) {


[flink] 02/03: [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35b2cae8cdbc8642ca51b922f62579357a47415b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 16:52:28 2022 +0100

    [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig
    
    After FLINK-29379 we can re-implement checkNotAllowedConfigurations to produce better exception messages.
    Instead of a generic "Configuration object ExecutionConfig changed", without a hint of what has been modified,
    we can report what ConfigOption has been changed.
---
 .../generated/deployment_configuration.html        |   2 +-
 .../client/program/StreamContextEnvironment.java   | 161 +++++++--------------
 .../program/StreamContextEnvironmentTest.java      |  14 +-
 .../apache/flink/configuration/Configuration.java  |  14 ++
 .../configuration/DelegatingConfiguration.java     |   5 +
 .../flink/configuration/DeploymentOptions.java     |   4 +-
 .../flink/configuration/ConfigurationTest.java     |  20 +++
 .../dispatcher/ConfigurationNotAllowedMessage.java |  26 +++-
 8 files changed, 128 insertions(+), 118 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html
index fa037edd5c5..07daa602a07 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>execution.program-config.wildcards</h5></td>
             <td style="word-wrap: break-word;"></td>
             <td>List&lt;String&gt;</td>
-            <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
+            <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently changes that are not backed by the Configuration class are always allowed.</td>
         </tr>
         <tr>
             <td><h5>execution.shutdown-on-application-finish</h5></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 587e6c6d6dd..15f7022086a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,10 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
@@ -45,20 +43,12 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,27 +62,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
     private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 
-    /**
-     * Due to the complexity of the current configuration stack, we need to limit the available
-     * wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
-     *
-     * <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
-     * materialize the config options, the implementation could be way easier. Currently, we need to
-     * manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
-     * #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
-     * #configuration}.
-     */
-    private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
-            new HashMap<>();
-
-    static {
-        SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
-                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
-                new WildcardOption<>(
-                        PipelineOptions.GLOBAL_JOB_PARAMETERS,
-                        env -> env.getConfig().getGlobalJobParameters().toMap()));
-    }
-
     private final boolean suppressSysout;
 
     private final boolean enforceSingleJobExecution;
@@ -281,14 +250,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
         final List<String> errors = new ArrayList<>();
 
         final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
-        final Configuration envConfigMap = new Configuration(configuration);
 
         // Removal must happen on Configuration objects (not instances of Map)
         // to also ignore map-typed config options with prefix key notation
         removeProgramConfigWildcards(clusterConfigMap);
+
+        checkMainConfiguration(clusterConfigMap, errors);
+        checkCheckpointConfig(clusterConfigMap, errors);
+        checkExecutionConfig(clusterConfigMap, errors);
+        return errors;
+    }
+
+    private void checkMainConfiguration(Configuration clusterConfigMap, List<String> errors) {
+        final Configuration envConfigMap = new Configuration(configuration);
         removeProgramConfigWildcards(envConfigMap);
 
-        // Check Configuration
         final MapDifference<String, String> diff =
                 Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
         diff.entriesOnlyOnRight()
@@ -308,88 +284,61 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                                 errors.add(
                                         ConfigurationNotAllowedMessage.ofConfigurationChanged(
                                                 k, v)));
-
-        final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
-        enrichProgramConfigWildcards(enrichedClusterConfig);
-
-        // Check CheckpointConfig
-        final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
-        clusterCheckpointConfig.configure(enrichedClusterConfig);
-        if (!Arrays.equals(
-                serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
-            errors.add(
-                    ConfigurationNotAllowedMessage.ofConfigurationObject(
-                            checkpointCfg.getClass().getSimpleName()));
-        }
-
-        // Check ExecutionConfig
-        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
-        clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
-        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
-            errors.add(
-                    ConfigurationNotAllowedMessage.ofConfigurationObject(
-                            config.getClass().getSimpleName()));
-        }
-
-        return errors;
     }
 
-    private void enrichProgramConfigWildcards(Configuration mutableConfig) {
-        for (String key : programConfigWildcards) {
-            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
-            if (option == null) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Unsupported option '%s' for program configuration wildcards.",
-                                key));
-            }
-            option.enrich(mutableConfig, this);
-        }
+    private void checkCheckpointConfig(Configuration clusterConfigMap, List<String> errors) {
+        CheckpointConfig expectedCheckpointConfig = new CheckpointConfig();
+        expectedCheckpointConfig.configure(clusterConfigMap);
+        checkConfigurationObject(
+                expectedCheckpointConfig.toConfiguration(),
+                checkpointCfg.toConfiguration(),
+                checkpointCfg.getClass().getSimpleName(),
+                errors);
     }
 
-    private void removeProgramConfigWildcards(Configuration mutableConfig) {
-        for (String key : programConfigWildcards) {
-            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
-            if (option == null) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Unsupported option '%s' for program configuration wildcards.",
-                                key));
-            }
-            option.remove(mutableConfig);
-        }
+    private void checkExecutionConfig(Configuration clusterConfigMap, List<String> errors) {
+        ExecutionConfig expectedExecutionConfig = new ExecutionConfig();
+        expectedExecutionConfig.configure(clusterConfigMap, getUserClassloader());
+        checkConfigurationObject(
+                expectedExecutionConfig.toConfiguration(),
+                config.toConfiguration(),
+                config.getClass().getSimpleName(),
+                errors);
     }
 
-    private static byte[] serializeConfig(Serializable config) {
-        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-            oos.writeObject(config);
-            oos.flush();
-            return bos.toByteArray();
-        } catch (IOException e) {
-            throw new FlinkRuntimeException("Cannot serialize configuration.", e);
-        }
-    }
-
-    /**
-     * Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
-     * getter.
-     */
-    private static final class WildcardOption<T> {
-        private final ConfigOption<T> option;
-        private final Function<StreamContextEnvironment, T> getter;
+    private void checkConfigurationObject(
+            Configuration expectedConfiguration,
+            Configuration actualConfiguration,
+            String configurationObjectName,
+            List<String> errors) {
+        removeProgramConfigWildcards(actualConfiguration);
 
-        WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
-            this.option = option;
-            this.getter = getter;
-        }
+        final MapDifference<String, String> diff =
+                Maps.difference(expectedConfiguration.toMap(), actualConfiguration.toMap());
+        diff.entriesOnlyOnRight()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectAdded(
+                                                configurationObjectName, k, v)));
+        diff.entriesDiffering()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectChanged(
+                                                configurationObjectName, k, v)));
 
-        void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
-            mutableConfig.set(option, getter.apply(fromEnv));
-        }
+        diff.entriesOnlyOnLeft()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectRemoved(
+                                                configurationObjectName, k, v)));
+    }
 
-        void remove(Configuration mutableConfig) {
-            mutableConfig.removeConfig(option);
+    private void removeProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            mutableConfig.removeKey(key);
         }
     }
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 3de40323ff2..f9432e048be 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -79,6 +80,7 @@ class StreamContextEnvironmentTest {
         environment.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         // Change the ExecutionConfig
         environment.setParallelism(25);
+        environment.getConfig().setMaxParallelism(1024);
 
         // Add/mutate values in the configuration
         environment.configure(programConfig);
@@ -90,7 +92,9 @@ class StreamContextEnvironmentTest {
                         ExecutionOptions.RUNTIME_MODE.key(),
                         ExecutionOptions.SORT_INPUTS.key(),
                         CheckpointConfig.class.getSimpleName(),
-                        ExecutionConfig.class.getSimpleName());
+                        ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key(),
+                        ExecutionConfig.class.getSimpleName(),
+                        PipelineOptions.MAX_PARALLELISM.key());
     }
 
     @ParameterizedTest
@@ -100,7 +104,8 @@ class StreamContextEnvironmentTest {
         final Configuration clusterConfig = new Configuration();
         clusterConfig.set(DeploymentOptions.TARGET, "local");
         clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
-        // Test prefix map notation
+        // Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of the fields not checked
+        // with PROGRAM_CONFIG_ENABLED set to false
         clusterConfig.setString(
                 PipelineOptions.GLOBAL_JOB_PARAMETERS.key() + "." + "my-param", "my-value");
 
@@ -119,10 +124,13 @@ class StreamContextEnvironmentTest {
                         true,
                         true,
                         false,
-                        Collections.singletonList(PipelineOptions.GLOBAL_JOB_PARAMETERS.key()));
+                        Arrays.asList(
+                                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+                                PipelineOptions.MAX_PARALLELISM.key()));
 
         // Change ExecutionConfig
         environment.configure(jobConfig);
+        environment.getConfig().setMaxParallelism(1024);
 
         environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d2032c606c4..b1740826850 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -765,6 +765,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
         }
     }
 
+    /**
+     * Removes given key from the configuration.
+     *
+     * @param key key of a config option to remove
+     * @return true is config has been removed, false otherwise
+     */
+    public boolean removeKey(String key) {
+        synchronized (this.confData) {
+            boolean removed = this.confData.remove(key) != null;
+            removed |= removePrefixMap(confData, key);
+            return removed;
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
 
     <T> void setValueInternal(String key, T value, boolean canBePrefixMap) {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index f1c480dc247..54a3230cc3e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -324,6 +324,11 @@ public final class DelegatingConfiguration extends Configuration {
         return backingConfig.removeConfig(configOption);
     }
 
+    @Override
+    public boolean removeKey(String key) {
+        return backingConfig.removeKey(key);
+    }
+
     @Override
     public boolean containsKey(String key) {
         return backingConfig.containsKey(prefix + key);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index 21bcedd0d7c..f6ca0d8a345 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -119,9 +119,7 @@ public class DeploymentOptions {
                                     .linebreak()
                                     .linebreak()
                                     .text(
-                                            "Currently, this list is limited to '%s' only.",
-                                            TextElement.text(
-                                                    PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+                                            "Currently changes that are not backed by the Configuration class are always allowed.")
                                     .build());
 
     @Experimental
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cd8ca26ae5e..341ded3a25f 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -314,6 +315,25 @@ public class ConfigurationTest extends TestLogger {
         assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
     }
 
+    @Test
+    public void testRemoveKey() {
+        Configuration cfg = new Configuration();
+        String key1 = "a.b";
+        String key2 = "c.d";
+        cfg.setInteger(key1, 42);
+        cfg.setInteger(key2, 44);
+        cfg.setInteger(key2 + ".f1", 44);
+        cfg.setInteger(key2 + ".f2", 44);
+        cfg.setInteger("e.f", 1337);
+
+        assertFalse(cfg.removeKey("not-existing-key"));
+        assertTrue(cfg.removeKey(key1));
+        assertFalse(cfg.containsKey(key1));
+
+        assertTrue(cfg.removeKey(key2));
+        assertThat(cfg.keySet(), containsInAnyOrder("e.f"));
+    }
+
     @Test
     public void testShouldParseValidStringToEnum() {
         final Configuration configuration = new Configuration();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index 1e3d237ce9a..23299bc059e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.DeploymentOptions;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference;
+import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference.ValueDifference;
 
 /**
  * If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, this error denotes the not
@@ -40,14 +40,30 @@ public class ConfigurationNotAllowedMessage {
         return String.format("Configuration %s:%s was removed.", configKey, configValue);
     }
 
-    public static String ofConfigurationChanged(
-            String configKey, MapDifference.ValueDifference<String> change) {
+    public static String ofConfigurationChanged(String configKey, ValueDifference<String> change) {
         return String.format(
                 "Configuration %s was changed from %s to %s.",
                 configKey, change.leftValue(), change.rightValue());
     }
 
-    public static String ofConfigurationObject(String configurationObject) {
-        return String.format("Configuration object %s changed.", configurationObject);
+    public static String ofConfigurationObjectAdded(
+            String configurationObject, String configKey, String configValue) {
+        return String.format(
+                "Configuration %s:%s not allowed in the configuration object %s.",
+                configKey, configValue, configurationObject);
+    }
+
+    public static String ofConfigurationObjectChanged(
+            String configurationObject, String configKey, ValueDifference<String> change) {
+        return String.format(
+                "Configuration %s was changed from %s to %s in the configuration object %s.",
+                configKey, change.leftValue(), change.rightValue(), configurationObject);
+    }
+
+    public static String ofConfigurationObjectRemoved(
+            String configurationObject, String configKey, String configValue) {
+        return String.format(
+                "Configuration %s:%s was removed from the configuration object %s.",
+                configKey, configValue, configurationObject);
     }
 }