You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/08 13:20:20 UTC

[flink] 02/03: [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35b2cae8cdbc8642ca51b922f62579357a47415b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 16:52:28 2022 +0100

    [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig
    
    After FLINK-29379 we can re-implement checkNotAllowedConfigurations to produce better exception messages.
    Instead of a generic "Configuration object ExecutionConfig changed", without a hint of what has been modified,
    we can report what ConfigOption has been changed.
---
 .../generated/deployment_configuration.html        |   2 +-
 .../client/program/StreamContextEnvironment.java   | 161 +++++++--------------
 .../program/StreamContextEnvironmentTest.java      |  14 +-
 .../apache/flink/configuration/Configuration.java  |  14 ++
 .../configuration/DelegatingConfiguration.java     |   5 +
 .../flink/configuration/DeploymentOptions.java     |   4 +-
 .../flink/configuration/ConfigurationTest.java     |  20 +++
 .../dispatcher/ConfigurationNotAllowedMessage.java |  26 +++-
 8 files changed, 128 insertions(+), 118 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html
index fa037edd5c5..07daa602a07 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>execution.program-config.wildcards</h5></td>
             <td style="word-wrap: break-word;"></td>
             <td>List&lt;String&gt;</td>
-            <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
+            <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently changes that are not backed by the Configuration class are always allowed.</td>
         </tr>
         <tr>
             <td><h5>execution.shutdown-on-application-finish</h5></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 587e6c6d6dd..15f7022086a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,10 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
@@ -45,20 +43,12 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,27 +62,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
     private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 
-    /**
-     * Due to the complexity of the current configuration stack, we need to limit the available
-     * wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
-     *
-     * <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
-     * materialize the config options, the implementation could be way easier. Currently, we need to
-     * manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
-     * #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
-     * #configuration}.
-     */
-    private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
-            new HashMap<>();
-
-    static {
-        SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
-                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
-                new WildcardOption<>(
-                        PipelineOptions.GLOBAL_JOB_PARAMETERS,
-                        env -> env.getConfig().getGlobalJobParameters().toMap()));
-    }
-
     private final boolean suppressSysout;
 
     private final boolean enforceSingleJobExecution;
@@ -281,14 +250,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
         final List<String> errors = new ArrayList<>();
 
         final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
-        final Configuration envConfigMap = new Configuration(configuration);
 
         // Removal must happen on Configuration objects (not instances of Map)
         // to also ignore map-typed config options with prefix key notation
         removeProgramConfigWildcards(clusterConfigMap);
+
+        checkMainConfiguration(clusterConfigMap, errors);
+        checkCheckpointConfig(clusterConfigMap, errors);
+        checkExecutionConfig(clusterConfigMap, errors);
+        return errors;
+    }
+
+    private void checkMainConfiguration(Configuration clusterConfigMap, List<String> errors) {
+        final Configuration envConfigMap = new Configuration(configuration);
         removeProgramConfigWildcards(envConfigMap);
 
-        // Check Configuration
         final MapDifference<String, String> diff =
                 Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
         diff.entriesOnlyOnRight()
@@ -308,88 +284,61 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
                                 errors.add(
                                         ConfigurationNotAllowedMessage.ofConfigurationChanged(
                                                 k, v)));
-
-        final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
-        enrichProgramConfigWildcards(enrichedClusterConfig);
-
-        // Check CheckpointConfig
-        final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
-        clusterCheckpointConfig.configure(enrichedClusterConfig);
-        if (!Arrays.equals(
-                serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
-            errors.add(
-                    ConfigurationNotAllowedMessage.ofConfigurationObject(
-                            checkpointCfg.getClass().getSimpleName()));
-        }
-
-        // Check ExecutionConfig
-        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
-        clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
-        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
-            errors.add(
-                    ConfigurationNotAllowedMessage.ofConfigurationObject(
-                            config.getClass().getSimpleName()));
-        }
-
-        return errors;
     }
 
-    private void enrichProgramConfigWildcards(Configuration mutableConfig) {
-        for (String key : programConfigWildcards) {
-            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
-            if (option == null) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Unsupported option '%s' for program configuration wildcards.",
-                                key));
-            }
-            option.enrich(mutableConfig, this);
-        }
+    private void checkCheckpointConfig(Configuration clusterConfigMap, List<String> errors) {
+        CheckpointConfig expectedCheckpointConfig = new CheckpointConfig();
+        expectedCheckpointConfig.configure(clusterConfigMap);
+        checkConfigurationObject(
+                expectedCheckpointConfig.toConfiguration(),
+                checkpointCfg.toConfiguration(),
+                checkpointCfg.getClass().getSimpleName(),
+                errors);
     }
 
-    private void removeProgramConfigWildcards(Configuration mutableConfig) {
-        for (String key : programConfigWildcards) {
-            final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
-            if (option == null) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Unsupported option '%s' for program configuration wildcards.",
-                                key));
-            }
-            option.remove(mutableConfig);
-        }
+    private void checkExecutionConfig(Configuration clusterConfigMap, List<String> errors) {
+        ExecutionConfig expectedExecutionConfig = new ExecutionConfig();
+        expectedExecutionConfig.configure(clusterConfigMap, getUserClassloader());
+        checkConfigurationObject(
+                expectedExecutionConfig.toConfiguration(),
+                config.toConfiguration(),
+                config.getClass().getSimpleName(),
+                errors);
     }
 
-    private static byte[] serializeConfig(Serializable config) {
-        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-            oos.writeObject(config);
-            oos.flush();
-            return bos.toByteArray();
-        } catch (IOException e) {
-            throw new FlinkRuntimeException("Cannot serialize configuration.", e);
-        }
-    }
-
-    /**
-     * Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
-     * getter.
-     */
-    private static final class WildcardOption<T> {
-        private final ConfigOption<T> option;
-        private final Function<StreamContextEnvironment, T> getter;
+    private void checkConfigurationObject(
+            Configuration expectedConfiguration,
+            Configuration actualConfiguration,
+            String configurationObjectName,
+            List<String> errors) {
+        removeProgramConfigWildcards(actualConfiguration);
 
-        WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
-            this.option = option;
-            this.getter = getter;
-        }
+        final MapDifference<String, String> diff =
+                Maps.difference(expectedConfiguration.toMap(), actualConfiguration.toMap());
+        diff.entriesOnlyOnRight()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectAdded(
+                                                configurationObjectName, k, v)));
+        diff.entriesDiffering()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectChanged(
+                                                configurationObjectName, k, v)));
 
-        void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
-            mutableConfig.set(option, getter.apply(fromEnv));
-        }
+        diff.entriesOnlyOnLeft()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationObjectRemoved(
+                                                configurationObjectName, k, v)));
+    }
 
-        void remove(Configuration mutableConfig) {
-            mutableConfig.removeConfig(option);
+    private void removeProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            mutableConfig.removeKey(key);
         }
     }
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 3de40323ff2..f9432e048be 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -79,6 +80,7 @@ class StreamContextEnvironmentTest {
         environment.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         // Change the ExecutionConfig
         environment.setParallelism(25);
+        environment.getConfig().setMaxParallelism(1024);
 
         // Add/mutate values in the configuration
         environment.configure(programConfig);
@@ -90,7 +92,9 @@ class StreamContextEnvironmentTest {
                         ExecutionOptions.RUNTIME_MODE.key(),
                         ExecutionOptions.SORT_INPUTS.key(),
                         CheckpointConfig.class.getSimpleName(),
-                        ExecutionConfig.class.getSimpleName());
+                        ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key(),
+                        ExecutionConfig.class.getSimpleName(),
+                        PipelineOptions.MAX_PARALLELISM.key());
     }
 
     @ParameterizedTest
@@ -100,7 +104,8 @@ class StreamContextEnvironmentTest {
         final Configuration clusterConfig = new Configuration();
         clusterConfig.set(DeploymentOptions.TARGET, "local");
         clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
-        // Test prefix map notation
+        // Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of the fields not checked
+        // with PROGRAM_CONFIG_ENABLED set to false
         clusterConfig.setString(
                 PipelineOptions.GLOBAL_JOB_PARAMETERS.key() + "." + "my-param", "my-value");
 
@@ -119,10 +124,13 @@ class StreamContextEnvironmentTest {
                         true,
                         true,
                         false,
-                        Collections.singletonList(PipelineOptions.GLOBAL_JOB_PARAMETERS.key()));
+                        Arrays.asList(
+                                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+                                PipelineOptions.MAX_PARALLELISM.key()));
 
         // Change ExecutionConfig
         environment.configure(jobConfig);
+        environment.getConfig().setMaxParallelism(1024);
 
         environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d2032c606c4..b1740826850 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -765,6 +765,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
         }
     }
 
+    /**
+     * Removes given key from the configuration.
+     *
+     * @param key key of a config option to remove
+     * @return true is config has been removed, false otherwise
+     */
+    public boolean removeKey(String key) {
+        synchronized (this.confData) {
+            boolean removed = this.confData.remove(key) != null;
+            removed |= removePrefixMap(confData, key);
+            return removed;
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
 
     <T> void setValueInternal(String key, T value, boolean canBePrefixMap) {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index f1c480dc247..54a3230cc3e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -324,6 +324,11 @@ public final class DelegatingConfiguration extends Configuration {
         return backingConfig.removeConfig(configOption);
     }
 
+    @Override
+    public boolean removeKey(String key) {
+        return backingConfig.removeKey(key);
+    }
+
     @Override
     public boolean containsKey(String key) {
         return backingConfig.containsKey(prefix + key);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index 21bcedd0d7c..f6ca0d8a345 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -119,9 +119,7 @@ public class DeploymentOptions {
                                     .linebreak()
                                     .linebreak()
                                     .text(
-                                            "Currently, this list is limited to '%s' only.",
-                                            TextElement.text(
-                                                    PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+                                            "Currently changes that are not backed by the Configuration class are always allowed.")
                                     .build());
 
     @Experimental
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cd8ca26ae5e..341ded3a25f 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -314,6 +315,25 @@ public class ConfigurationTest extends TestLogger {
         assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
     }
 
+    @Test
+    public void testRemoveKey() {
+        Configuration cfg = new Configuration();
+        String key1 = "a.b";
+        String key2 = "c.d";
+        cfg.setInteger(key1, 42);
+        cfg.setInteger(key2, 44);
+        cfg.setInteger(key2 + ".f1", 44);
+        cfg.setInteger(key2 + ".f2", 44);
+        cfg.setInteger("e.f", 1337);
+
+        assertFalse(cfg.removeKey("not-existing-key"));
+        assertTrue(cfg.removeKey(key1));
+        assertFalse(cfg.containsKey(key1));
+
+        assertTrue(cfg.removeKey(key2));
+        assertThat(cfg.keySet(), containsInAnyOrder("e.f"));
+    }
+
     @Test
     public void testShouldParseValidStringToEnum() {
         final Configuration configuration = new Configuration();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index 1e3d237ce9a..23299bc059e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.DeploymentOptions;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference;
+import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference.ValueDifference;
 
 /**
  * If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, this error denotes the not
@@ -40,14 +40,30 @@ public class ConfigurationNotAllowedMessage {
         return String.format("Configuration %s:%s was removed.", configKey, configValue);
     }
 
-    public static String ofConfigurationChanged(
-            String configKey, MapDifference.ValueDifference<String> change) {
+    public static String ofConfigurationChanged(String configKey, ValueDifference<String> change) {
         return String.format(
                 "Configuration %s was changed from %s to %s.",
                 configKey, change.leftValue(), change.rightValue());
     }
 
-    public static String ofConfigurationObject(String configurationObject) {
-        return String.format("Configuration object %s changed.", configurationObject);
+    public static String ofConfigurationObjectAdded(
+            String configurationObject, String configKey, String configValue) {
+        return String.format(
+                "Configuration %s:%s not allowed in the configuration object %s.",
+                configKey, configValue, configurationObject);
+    }
+
+    public static String ofConfigurationObjectChanged(
+            String configurationObject, String configKey, ValueDifference<String> change) {
+        return String.format(
+                "Configuration %s was changed from %s to %s in the configuration object %s.",
+                configKey, change.leftValue(), change.rightValue(), configurationObject);
+    }
+
+    public static String ofConfigurationObjectRemoved(
+            String configurationObject, String configKey, String configValue) {
+        return String.format(
+                "Configuration %s:%s was removed from the configuration object %s.",
+                configKey, configValue, configurationObject);
     }
 }