You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/08 13:20:20 UTC
[flink] 02/03: [FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 35b2cae8cdbc8642ca51b922f62579357a47415b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 16:52:28 2022 +0100
[FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations using refactored CheckpointConfig and ExecutionConfig
After FLINK-29379 we can re-implement checkNotAllowedConfigurations to produce better exception messages.
Instead of a generic "Configuration object ExecutionConfig changed", without a hint of what has been modified,
we can report what ConfigOption has been changed.
---
.../generated/deployment_configuration.html | 2 +-
.../client/program/StreamContextEnvironment.java | 161 +++++++--------------
.../program/StreamContextEnvironmentTest.java | 14 +-
.../apache/flink/configuration/Configuration.java | 14 ++
.../configuration/DelegatingConfiguration.java | 5 +
.../flink/configuration/DeploymentOptions.java | 4 +-
.../flink/configuration/ConfigurationTest.java | 20 +++
.../dispatcher/ConfigurationNotAllowedMessage.java | 26 +++-
8 files changed, 128 insertions(+), 118 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html
index fa037edd5c5..07daa602a07 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -30,7 +30,7 @@
<td><h5>execution.program-config.wildcards</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List<String></td>
- <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
+ <td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently changes that are not backed by the Configuration class are always allowed.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-application-finish</h5></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 587e6c6d6dd..15f7022086a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -22,10 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
@@ -45,20 +43,12 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -72,27 +62,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
- /**
- * Due to the complexity of the current configuration stack, we need to limit the available
- * wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
- *
- * <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
- * materialize the config options, the implementation could be way easier. Currently, we need to
- * manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
- * #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
- * #configuration}.
- */
- private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
- new HashMap<>();
-
- static {
- SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
- PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
- new WildcardOption<>(
- PipelineOptions.GLOBAL_JOB_PARAMETERS,
- env -> env.getConfig().getGlobalJobParameters().toMap()));
- }
-
private final boolean suppressSysout;
private final boolean enforceSingleJobExecution;
@@ -281,14 +250,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
final List<String> errors = new ArrayList<>();
final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
- final Configuration envConfigMap = new Configuration(configuration);
// Removal must happen on Configuration objects (not instances of Map)
// to also ignore map-typed config options with prefix key notation
removeProgramConfigWildcards(clusterConfigMap);
+
+ checkMainConfiguration(clusterConfigMap, errors);
+ checkCheckpointConfig(clusterConfigMap, errors);
+ checkExecutionConfig(clusterConfigMap, errors);
+ return errors;
+ }
+
+ private void checkMainConfiguration(Configuration clusterConfigMap, List<String> errors) {
+ final Configuration envConfigMap = new Configuration(configuration);
removeProgramConfigWildcards(envConfigMap);
- // Check Configuration
final MapDifference<String, String> diff =
Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
diff.entriesOnlyOnRight()
@@ -308,88 +284,61 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationChanged(
k, v)));
-
- final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
- enrichProgramConfigWildcards(enrichedClusterConfig);
-
- // Check CheckpointConfig
- final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
- clusterCheckpointConfig.configure(enrichedClusterConfig);
- if (!Arrays.equals(
- serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
- errors.add(
- ConfigurationNotAllowedMessage.ofConfigurationObject(
- checkpointCfg.getClass().getSimpleName()));
- }
-
- // Check ExecutionConfig
- final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
- clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
- if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
- errors.add(
- ConfigurationNotAllowedMessage.ofConfigurationObject(
- config.getClass().getSimpleName()));
- }
-
- return errors;
}
- private void enrichProgramConfigWildcards(Configuration mutableConfig) {
- for (String key : programConfigWildcards) {
- final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
- if (option == null) {
- throw new FlinkRuntimeException(
- String.format(
- "Unsupported option '%s' for program configuration wildcards.",
- key));
- }
- option.enrich(mutableConfig, this);
- }
+ private void checkCheckpointConfig(Configuration clusterConfigMap, List<String> errors) {
+ CheckpointConfig expectedCheckpointConfig = new CheckpointConfig();
+ expectedCheckpointConfig.configure(clusterConfigMap);
+ checkConfigurationObject(
+ expectedCheckpointConfig.toConfiguration(),
+ checkpointCfg.toConfiguration(),
+ checkpointCfg.getClass().getSimpleName(),
+ errors);
}
- private void removeProgramConfigWildcards(Configuration mutableConfig) {
- for (String key : programConfigWildcards) {
- final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
- if (option == null) {
- throw new FlinkRuntimeException(
- String.format(
- "Unsupported option '%s' for program configuration wildcards.",
- key));
- }
- option.remove(mutableConfig);
- }
+ private void checkExecutionConfig(Configuration clusterConfigMap, List<String> errors) {
+ ExecutionConfig expectedExecutionConfig = new ExecutionConfig();
+ expectedExecutionConfig.configure(clusterConfigMap, getUserClassloader());
+ checkConfigurationObject(
+ expectedExecutionConfig.toConfiguration(),
+ config.toConfiguration(),
+ config.getClass().getSimpleName(),
+ errors);
}
- private static byte[] serializeConfig(Serializable config) {
- try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- oos.flush();
- return bos.toByteArray();
- } catch (IOException e) {
- throw new FlinkRuntimeException("Cannot serialize configuration.", e);
- }
- }
-
- /**
- * Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
- * getter.
- */
- private static final class WildcardOption<T> {
- private final ConfigOption<T> option;
- private final Function<StreamContextEnvironment, T> getter;
+ private void checkConfigurationObject(
+ Configuration expectedConfiguration,
+ Configuration actualConfiguration,
+ String configurationObjectName,
+ List<String> errors) {
+ removeProgramConfigWildcards(actualConfiguration);
- WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
- this.option = option;
- this.getter = getter;
- }
+ final MapDifference<String, String> diff =
+ Maps.difference(expectedConfiguration.toMap(), actualConfiguration.toMap());
+ diff.entriesOnlyOnRight()
+ .forEach(
+ (k, v) ->
+ errors.add(
+ ConfigurationNotAllowedMessage.ofConfigurationObjectAdded(
+ configurationObjectName, k, v)));
+ diff.entriesDiffering()
+ .forEach(
+ (k, v) ->
+ errors.add(
+ ConfigurationNotAllowedMessage.ofConfigurationObjectChanged(
+ configurationObjectName, k, v)));
- void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
- mutableConfig.set(option, getter.apply(fromEnv));
- }
+ diff.entriesOnlyOnLeft()
+ .forEach(
+ (k, v) ->
+ errors.add(
+ ConfigurationNotAllowedMessage.ofConfigurationObjectRemoved(
+ configurationObjectName, k, v)));
+ }
- void remove(Configuration mutableConfig) {
- mutableConfig.removeConfig(option);
+ private void removeProgramConfigWildcards(Configuration mutableConfig) {
+ for (String key : programConfigWildcards) {
+ mutableConfig.removeKey(key);
}
}
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 3de40323ff2..f9432e048be 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -79,6 +80,7 @@ class StreamContextEnvironmentTest {
environment.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
// Change the ExecutionConfig
environment.setParallelism(25);
+ environment.getConfig().setMaxParallelism(1024);
// Add/mutate values in the configuration
environment.configure(programConfig);
@@ -90,7 +92,9 @@ class StreamContextEnvironmentTest {
ExecutionOptions.RUNTIME_MODE.key(),
ExecutionOptions.SORT_INPUTS.key(),
CheckpointConfig.class.getSimpleName(),
- ExecutionConfig.class.getSimpleName());
+ ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key(),
+ ExecutionConfig.class.getSimpleName(),
+ PipelineOptions.MAX_PARALLELISM.key());
}
@ParameterizedTest
@@ -100,7 +104,8 @@ class StreamContextEnvironmentTest {
final Configuration clusterConfig = new Configuration();
clusterConfig.set(DeploymentOptions.TARGET, "local");
clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
- // Test prefix map notation
+ // Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of the fields not checked
+ // with PROGRAM_CONFIG_ENABLED set to false
clusterConfig.setString(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key() + "." + "my-param", "my-value");
@@ -119,10 +124,13 @@ class StreamContextEnvironmentTest {
true,
true,
false,
- Collections.singletonList(PipelineOptions.GLOBAL_JOB_PARAMETERS.key()));
+ Arrays.asList(
+ PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+ PipelineOptions.MAX_PARALLELISM.key()));
// Change ExecutionConfig
environment.configure(jobConfig);
+ environment.getConfig().setMaxParallelism(1024);
environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d2032c606c4..b1740826850 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -765,6 +765,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
}
+ /**
+ * Removes given key from the configuration.
+ *
+ * @param key key of a config option to remove
+ * @return true is config has been removed, false otherwise
+ */
+ public boolean removeKey(String key) {
+ synchronized (this.confData) {
+ boolean removed = this.confData.remove(key) != null;
+ removed |= removePrefixMap(confData, key);
+ return removed;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
<T> void setValueInternal(String key, T value, boolean canBePrefixMap) {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index f1c480dc247..54a3230cc3e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -324,6 +324,11 @@ public final class DelegatingConfiguration extends Configuration {
return backingConfig.removeConfig(configOption);
}
+ @Override
+ public boolean removeKey(String key) {
+ return backingConfig.removeKey(key);
+ }
+
@Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index 21bcedd0d7c..f6ca0d8a345 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -119,9 +119,7 @@ public class DeploymentOptions {
.linebreak()
.linebreak()
.text(
- "Currently, this list is limited to '%s' only.",
- TextElement.text(
- PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+ "Currently changes that are not backed by the Configuration class are always allowed.")
.build());
@Experimental
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cd8ca26ae5e..341ded3a25f 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -314,6 +315,25 @@ public class ConfigurationTest extends TestLogger {
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}
+ @Test
+ public void testRemoveKey() {
+ Configuration cfg = new Configuration();
+ String key1 = "a.b";
+ String key2 = "c.d";
+ cfg.setInteger(key1, 42);
+ cfg.setInteger(key2, 44);
+ cfg.setInteger(key2 + ".f1", 44);
+ cfg.setInteger(key2 + ".f2", 44);
+ cfg.setInteger("e.f", 1337);
+
+ assertFalse(cfg.removeKey("not-existing-key"));
+ assertTrue(cfg.removeKey(key1));
+ assertFalse(cfg.containsKey(key1));
+
+ assertTrue(cfg.removeKey(key2));
+ assertThat(cfg.keySet(), containsInAnyOrder("e.f"));
+ }
+
@Test
public void testShouldParseValidStringToEnum() {
final Configuration configuration = new Configuration();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
index 1e3d237ce9a..23299bc059e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference;
+import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference.ValueDifference;
/**
* If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, this error denotes the not
@@ -40,14 +40,30 @@ public class ConfigurationNotAllowedMessage {
return String.format("Configuration %s:%s was removed.", configKey, configValue);
}
- public static String ofConfigurationChanged(
- String configKey, MapDifference.ValueDifference<String> change) {
+ public static String ofConfigurationChanged(String configKey, ValueDifference<String> change) {
return String.format(
"Configuration %s was changed from %s to %s.",
configKey, change.leftValue(), change.rightValue());
}
- public static String ofConfigurationObject(String configurationObject) {
- return String.format("Configuration object %s changed.", configurationObject);
+ public static String ofConfigurationObjectAdded(
+ String configurationObject, String configKey, String configValue) {
+ return String.format(
+ "Configuration %s:%s not allowed in the configuration object %s.",
+ configKey, configValue, configurationObject);
+ }
+
+ public static String ofConfigurationObjectChanged(
+ String configurationObject, String configKey, ValueDifference<String> change) {
+ return String.format(
+ "Configuration %s was changed from %s to %s in the configuration object %s.",
+ configKey, change.leftValue(), change.rightValue(), configurationObject);
+ }
+
+ public static String ofConfigurationObjectRemoved(
+ String configurationObject, String configKey, String configValue) {
+ return String.format(
+ "Configuration %s:%s was removed from the configuration object %s.",
+ configKey, configValue, configurationObject);
}
}