You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/09/23 14:51:16 UTC

[flink] branch release-1.16 updated: [FLINK-29309][streaming-java] Relax allow-client-job-configurations for Table API and parameters

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new c917a6e8a09 [FLINK-29309][streaming-java] Relax allow-client-job-configurations for Table API and parameters
c917a6e8a09 is described below

commit c917a6e8a09d42dc87ba27afdfd42a2a640bb984
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Sep 15 13:49:33 2022 +0200

    [FLINK-29309][streaming-java] Relax allow-client-job-configurations for Table API and parameters
    
    This closes #20840.
---
 .../generated/deployment_configuration.html        |  18 +-
 .../program/MutatedConfigurationException.java     |   5 +-
 .../client/program/StreamContextEnvironment.java   | 196 ++++++++++++++++-----
 .../apache/flink/client/program/ClientTest.java    |   2 +-
 .../program/StreamContextEnvironmentTest.java      |  75 ++++++--
 .../apache/flink/api/common/ExecutionConfig.java   |   9 +
 .../flink/configuration/DeploymentOptions.java     |  51 +++++-
 .../dispatcher/ConfigurationNotAllowedMessage.java |   5 +-
 .../api/environment/CheckpointConfig.java          |   9 +
 9 files changed, 287 insertions(+), 83 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html
index 45a4c0a7062..5a8ba591add 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -8,12 +8,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>execution.allow-client-job-configurations</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Determines whether configurations in the user program are allowed. Depending on your deployment mode failing the job might have different affects. Either your client that is trying to submit the job to an external cluster (session cluster deployment) throws the exception or the Job manager (application mode deployment).</td>
-        </tr>
         <tr>
             <td><h5>execution.attached</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -26,6 +20,18 @@
             <td>List&lt;String&gt;</td>
             <td>Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.</td>
         </tr>
+        <tr>
+            <td><h5>execution.program-config.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Determines whether configurations in the user program are allowed. By default, configuration can be set both on a cluster-level (via options) or within the user program (i.e. programmatic via environment setters). If disabled, all configuration must be defined on a cluster-level and programmatic setters in the user program are prohibited.<br /><br />Depending on your deployment mode failing the job might have different implications. Either your client that is trying to su [...]
+        </tr>
+        <tr>
+            <td><h5>execution.program-config.wildcards</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
+        </tr>
         <tr>
             <td><h5>execution.shutdown-on-application-finish</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MutatedConfigurationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/MutatedConfigurationException.java
index 6b36ccea6ee..379617600ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MutatedConfigurationException.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MutatedConfigurationException.java
@@ -19,12 +19,13 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.DeploymentOptions;
 
 import java.util.Collection;
 
 /**
- * If {@link org.apache.flink.configuration.DeploymentOptions#ALLOW_CLIENT_JOB_CONFIGURATIONS} is
- * disabled configurations in the user jar will throw this exception.
+ * If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, configurations in the user jar
+ * will throw this exception.
  */
 @Internal
 public class MutatedConfigurationException extends Exception {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index c6cbfb85695..262b4821f05 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -19,15 +19,19 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -49,9 +53,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,18 +72,36 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
     private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 
+    /**
+     * Due to the complexity of the current configuration stack, we need to limit the available
+     * wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
+     *
+     * <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
+     * materialize the config options, the implementation could be way easier. Currently, we need to
+     * manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
+     * #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
+     * #configuration}.
+     */
+    private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
+            new HashMap<>();
+
+    static {
+        SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
+                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+                new WildcardOption<>(
+                        PipelineOptions.GLOBAL_JOB_PARAMETERS,
+                        env -> env.getConfig().getGlobalJobParameters().toMap()));
+    }
+
     private final boolean suppressSysout;
 
     private final boolean enforceSingleJobExecution;
-    private final byte[] originalCheckpointConfigSerialized;
-    private final byte[] originalExecutionConfigSerialized;
-    private final Configuration originalConfiguration;
+    private final Configuration clusterConfiguration;
 
     private int jobCounter;
 
-    private final Collection<String> errorMessages;
-
-    private final boolean allowConfigurations;
+    private final boolean programConfigEnabled;
+    private final Collection<String> programConfigWildcards;
 
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
@@ -87,6 +112,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
         this(
                 executorServiceLoader,
                 configuration,
+                configuration,
                 userCodeClassLoader,
                 enforceSingleJobExecution,
                 suppressSysout,
@@ -97,21 +123,20 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
     @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
+            final Configuration clusterConfiguration,
             final Configuration configuration,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout,
-            final boolean allowConfigurations,
-            final Collection<String> errorMessages) {
+            final boolean programConfigEnabled,
+            final Collection<String> programConfigWildcards) {
         super(executorServiceLoader, configuration, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
-        this.allowConfigurations = allowConfigurations;
-        this.originalCheckpointConfigSerialized = serializeConfig(checkpointCfg);
-        this.originalExecutionConfigSerialized = serializeConfig(config);
-        this.originalConfiguration = new Configuration(configuration);
-        this.errorMessages = errorMessages;
+        this.clusterConfiguration = clusterConfiguration;
         this.jobCounter = 0;
+        this.programConfigEnabled = programConfigEnabled;
+        this.programConfigWildcards = programConfigWildcards;
     }
 
     @Override
@@ -136,13 +161,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
         }
     }
 
-    private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
-        errorMessages.addAll(collectNotAllowedConfigurations());
-        if (!errorMessages.isEmpty()) {
-            throw new MutatedConfigurationException(errorMessages);
-        }
-    }
-
     private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
         checkNotNull(jobClient);
 
@@ -206,35 +224,28 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
     public static void setAsContext(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfiguration,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout) {
-        StreamExecutionEnvironmentFactory factory =
-                conf -> {
-                    final List<String> errors = new ArrayList<>();
-                    final boolean allowConfigurations =
-                            configuration.getBoolean(
-                                    DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
-                    if (!allowConfigurations && !conf.toMap().isEmpty()) {
-                        conf.toMap()
-                                .forEach(
-                                        (k, v) ->
-                                                errors.add(
-                                                        ConfigurationNotAllowedMessage
-                                                                .ofConfigurationKeyAndValue(k, v)));
-                    }
-                    Configuration mergedConfiguration = new Configuration();
-                    mergedConfiguration.addAll(configuration);
-                    mergedConfiguration.addAll(conf);
+        final StreamExecutionEnvironmentFactory factory =
+                envInitConfig -> {
+                    final boolean programConfigEnabled =
+                            clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED);
+                    final List<String> programConfigWildcards =
+                            clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);
+                    final Configuration mergedEnvConfig = new Configuration();
+                    mergedEnvConfig.addAll(clusterConfiguration);
+                    mergedEnvConfig.addAll(envInitConfig);
                     return new StreamContextEnvironment(
                             executorServiceLoader,
-                            mergedConfiguration,
+                            clusterConfiguration,
+                            mergedEnvConfig,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
                             suppressSysout,
-                            allowConfigurations,
-                            errors);
+                            programConfigEnabled,
+                            programConfigWildcards);
                 };
         initializeContextEnvironment(factory);
     }
@@ -243,13 +254,43 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
         resetContextEnvironment();
     }
 
-    private List<String> collectNotAllowedConfigurations() {
-        final List<String> errors = new ArrayList<>();
-        if (allowConfigurations) {
-            return errors;
+    // --------------------------------------------------------------------------------------------
+    // Program Configuration Validation
+    // --------------------------------------------------------------------------------------------
+
+    private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
+        final Collection<String> errorMessages = collectNotAllowedConfigurations();
+        if (!errorMessages.isEmpty()) {
+            throw new MutatedConfigurationException(errorMessages);
         }
+    }
+
+    /**
+     * Collects programmatic configuration changes.
+     *
+     * <p>Configuration is spread across instances of {@link Configuration} and POJOs (e.g. {@link
+     * ExecutionConfig}), so we need to have logic for comparing both. For supporting wildcards, the
+     * first can be accomplished by simply removing keys, the latter by setting equal fields before
+     * comparison.
+     */
+    private Collection<String> collectNotAllowedConfigurations() {
+        if (programConfigEnabled) {
+            return Collections.emptyList();
+        }
+
+        final List<String> errors = new ArrayList<>();
+
+        final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
+        final Configuration envConfigMap = new Configuration(configuration);
+
+        // Removal must happen on Configuration objects (not instances of Map)
+        // to also ignore map-typed config options with prefix key notation
+        removeProgramConfigWildcards(clusterConfigMap);
+        removeProgramConfigWildcards(envConfigMap);
+
+        // Check Configuration
         final MapDifference<String, String> diff =
-                Maps.difference(originalConfiguration.toMap(), configuration.toMap());
+                Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
         diff.entriesOnlyOnRight()
                 .forEach(
                         (k, v) ->
@@ -269,20 +310,57 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                                         ConfigurationNotAllowedMessage.ofConfigurationChange(
                                                 k, v)));
 
-        if (!Arrays.equals(originalCheckpointConfigSerialized, serializeConfig(checkpointCfg))) {
+        final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
+        enrichProgramConfigWildcards(enrichedClusterConfig);
+
+        // Check CheckpointConfig
+        final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
+        clusterCheckpointConfig.configure(enrichedClusterConfig);
+        if (!Arrays.equals(
+                serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             checkpointCfg.getClass().getSimpleName()));
         }
 
-        if (!Arrays.equals(originalExecutionConfigSerialized, serializeConfig(config))) {
+        // Check ExecutionConfig
+        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+        clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
+        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             config.getClass().getSimpleName()));
         }
+
         return errors;
     }
 
+    private void enrichProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
+            if (option == null) {
+                throw new FlinkRuntimeException(
+                        String.format(
+                                "Unsupported option '%s' for program configuration wildcards.",
+                                key));
+            }
+            option.enrich(mutableConfig, this);
+        }
+    }
+
+    private void removeProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
+            if (option == null) {
+                throw new FlinkRuntimeException(
+                        String.format(
+                                "Unsupported option '%s' for program configuration wildcards.",
+                                key));
+            }
+            option.remove(mutableConfig);
+        }
+    }
+
     private static byte[] serializeConfig(Serializable config) {
         try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
@@ -293,4 +371,26 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
             throw new FlinkRuntimeException("Cannot serialize configuration.", e);
         }
     }
+
+    /**
+     * Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
+     * getter.
+     */
+    private static final class WildcardOption<T> {
+        private final ConfigOption<T> option;
+        private final Function<StreamContextEnvironment, T> getter;
+
+        WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
+            this.option = option;
+            this.getter = getter;
+        }
+
+        void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
+            mutableConfig.set(option, getter.apply(fromEnv));
+        }
+
+        void remove(Configuration mutableConfig) {
+            mutableConfig.removeConfig(option);
+        }
+    }
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index b1659eb732c..4d8f4403e81 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -378,7 +378,7 @@ class ClientTest {
                             .build();
 
             final Configuration configuration = fromPackagedProgram(program, 1, false);
-            configuration.set(DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS, false);
+            configuration.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
 
             assertThatThrownBy(
                             () ->
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index da5456057d2..3de40323ff2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -41,31 +42,33 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class StreamContextEnvironmentTest {
 
     @ParameterizedTest
     @MethodSource("provideExecutors")
-    void testDisallowJobConfigurationChanges(
+    void testDisallowProgramConfigurationChanges(
             ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
-        final Configuration clusterConfiguration = new Configuration();
-        clusterConfiguration.set(DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS, false);
-        clusterConfiguration.set(DeploymentOptions.TARGET, "local");
-        clusterConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, "/flink/savepoints");
-        clusterConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
-
-        final Configuration jobConfiguration = new Configuration();
-        jobConfiguration.set(DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS, false);
-        jobConfiguration.set(DeploymentOptions.TARGET, "local");
-        jobConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
-        jobConfiguration.set(ExecutionOptions.SORT_INPUTS, true);
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, "/flink/savepoints");
+        clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+        final Configuration programConfig = new Configuration();
+        programConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        programConfig.set(DeploymentOptions.TARGET, "local");
+        programConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        programConfig.set(ExecutionOptions.SORT_INPUTS, true);
 
         final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         final StreamContextEnvironment environment =
                 new StreamContextEnvironment(
                         new MockExecutorServiceLoader(),
-                        clusterConfiguration,
+                        clusterConfig,
+                        clusterConfig,
                         classLoader,
                         true,
                         true,
@@ -78,7 +81,7 @@ class StreamContextEnvironmentTest {
         environment.setParallelism(25);
 
         // Add/mutate values in the configuration
-        environment.configure(jobConfiguration);
+        environment.configure(programConfig);
 
         environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
@@ -90,6 +93,44 @@ class StreamContextEnvironmentTest {
                         ExecutionConfig.class.getSimpleName());
     }
 
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testAllowProgramConfigurationWildcards(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        // Test prefix map notation
+        clusterConfig.setString(
+                PipelineOptions.GLOBAL_JOB_PARAMETERS.key() + "." + "my-param", "my-value");
+
+        final Configuration jobConfig = new Configuration();
+        jobConfig.set(
+                PipelineOptions.GLOBAL_JOB_PARAMETERS,
+                Collections.singletonMap("my-other-param", "my-other-value"));
+
+        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        final StreamContextEnvironment environment =
+                new StreamContextEnvironment(
+                        new MockExecutorServiceLoader(),
+                        clusterConfig,
+                        clusterConfig,
+                        classLoader,
+                        true,
+                        true,
+                        false,
+                        Collections.singletonList(PipelineOptions.GLOBAL_JOB_PARAMETERS.key()));
+
+        // Change ExecutionConfig
+        environment.configure(jobConfig);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+        assertThat(environment.getConfig().getGlobalJobParameters().toMap())
+                .containsOnlyKeys("my-other-param");
+    }
+
     private static List<ThrowingConsumer<StreamExecutionEnvironment, Exception>>
             provideExecutors() {
         return Arrays.asList(
@@ -100,12 +141,14 @@ class StreamContextEnvironmentTest {
 
         @Override
         public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
-            throw new UnsupportedOperationException("Not implemented");
+            throw new ExecutorReachedException();
         }
 
         @Override
         public Stream<String> getExecutorNames() {
-            throw new UnsupportedOperationException("Not implemented");
+            throw new ExecutorReachedException();
         }
     }
+
+    private static class ExecutorReachedException extends RuntimeException {}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b91da90e50..24e01033de5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -73,6 +73,15 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 @Public
 public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> {
 
+    // NOTE TO IMPLEMENTERS:
+    // Please do not add further fields to this class. Use the ConfigOption stack instead!
+    // It is currently very tricky to keep this kind of POJO classes in sync with instances of
+    // org.apache.flink.configuration.Configuration. Instances of Configuration are way easier to
+    // pass, layer, merge, restrict, copy, filter, etc.
+    // See ExecutionOptions.RUNTIME_MODE for a reference implementation. If the option is very
+    // crucial for the API, we can add a dedicated setter to StreamExecutionEnvironment. Otherwise,
+    // introducing a ConfigOption should be enough.
+
     private static final long serialVersionUID = 1L;
 
     /**
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index aa3555f1c16..21bcedd0d7c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -106,15 +106,50 @@ public class DeploymentOptions {
                                     .build());
 
     @Experimental
-    public static final ConfigOption<Boolean> ALLOW_CLIENT_JOB_CONFIGURATIONS =
-            ConfigOptions.key("execution.allow-client-job-configurations")
+    public static final ConfigOption<List<String>> PROGRAM_CONFIG_WILDCARDS =
+            ConfigOptions.key("execution.program-config.wildcards")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "List of configuration keys that are allowed to be set in a user program "
+                                                    + "regardless whether program configuration is enabled or not.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "Currently, this list is limited to '%s' only.",
+                                            TextElement.text(
+                                                    PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+                                    .build());
+
+    @Experimental
+    public static final ConfigOption<Boolean> PROGRAM_CONFIG_ENABLED =
+            ConfigOptions.key("execution.program-config.enabled")
                     .booleanType()
                     .defaultValue(true)
+                    .withDeprecatedKeys("execution.allow-client-job-configurations")
                     .withDescription(
-                            "Determines whether configurations in the user program are "
-                                    + "allowed. Depending on your deployment mode failing the job "
-                                    + "might have different affects. Either your client that is "
-                                    + "trying to submit the job to an external cluster (session "
-                                    + "cluster deployment) throws the exception or the Job "
-                                    + "manager (application mode deployment).");
+                            Description.builder()
+                                    .text(
+                                            "Determines whether configurations in the user program are allowed. By default, "
+                                                    + "configuration can be set both on a cluster-level (via options) or "
+                                                    + "within the user program (i.e. programmatic via environment setters). "
+                                                    + "If disabled, all configuration must be defined on a cluster-level and "
+                                                    + "programmatic setters in the user program are prohibited.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "Depending on your deployment mode failing the job might have different implications. "
+                                                    + "Either your client that is trying to submit the job to an external "
+                                                    + "cluster (session cluster deployment) throws the exception or the "
+                                                    + "job manager (application mode deployment).")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "The '%s' option lists configuration keys that are allowed to be set in user programs "
+                                                    + "regardless of this setting.",
+                                            TextElement.text(PROGRAM_CONFIG_WILDCARDS.key()))
+                                    .build());
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index 81654a14bd0..fd01c52d004 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -19,12 +19,13 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.DeploymentOptions;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference;
 
 /**
- * If {@link org.apache.flink.configuration.DeploymentOptions#ALLOW_CLIENT_JOB_CONFIGURATIONS} is
- * disabled this error denotes the not allowed configuration.
+ * If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, this error denotes the not
+ * allowed configuration.
  */
 @Internal
 public class ConfigurationNotAllowedMessage {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 8983baa79f3..edcadc2c46f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -52,6 +52,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public class CheckpointConfig implements java.io.Serializable {
 
+    // NOTE TO IMPLEMENTERS:
+    // Please do not add further fields to this class. Use the ConfigOption stack instead!
+    // It is currently very tricky to keep this kind of POJO classes in sync with instances of
+    // org.apache.flink.configuration.Configuration. Instances of Configuration are way easier to
+    // pass, layer, merge, restrict, copy, filter, etc.
+    // See ExecutionOptions.RUNTIME_MODE for a reference implementation. If the option is very
+    // crucial for the API, we can add a dedicated setter to StreamExecutionEnvironment. Otherwise,
+    // introducing a ConfigOption should be enough.
+
     private static final long serialVersionUID = -750378776078908147L;
 
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);