You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/28 14:20:21 UTC

[GitHub] [flink] dmvk commented on a change in pull request #18043: [FLINK-25206] Add configuration option to disable configuration in user programs

dmvk commented on a change in pull request #18043:
URL: https://github.com/apache/flink/pull/18043#discussion_r794535671



##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -161,6 +213,18 @@ public static void setAsContext(
             final boolean suppressSysout) {
         StreamExecutionEnvironmentFactory factory =
                 conf -> {
+                    final List<String> errors = new ArrayList<>();
+                    final boolean allowConfigurations =
+                            configuration.getBoolean(
+                                    DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
+                    if (!allowConfigurations && !conf.toMap().isEmpty()) {
+                        conf.toMap()
+                                .forEach(
+                                        (k, v) ->
+                                                errors.add(
+                                                        ConfigurationNotAllowedMessage
+                                                                .ofConfigurationKeyAndValue(k, v)));

Review comment:
       Would throwing an exception here right away make more sense? It could point user to the exact location where the problem is.
   
   Also it would avoid passing the errors into the stream environment, which could simplify the change-set a bit.
   
   eg.
   
   ```suggestion
                           throw new MutatedConfigurationException("Supplying a custom configuration for the stream environment is not allowed, because the client-side configuration is disabled.");
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -169,12 +233,53 @@ public static void setAsContext(
                             mergedConfiguration,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
-                            suppressSysout);
+                            suppressSysout,
+                            allowConfigurations,
+                            errors);
                 };
         initializeContextEnvironment(factory);
     }
 
     public static void unsetAsContext() {
         resetContextEnvironment();
     }
+
+    private List<String> collectNotAllowedConfigurations() {
+        final List<String> errors = new ArrayList<>();
+        if (allowConfigurations) {
+            return errors;
+        }
+        final MapDifference<String, String> diff =
+                Maps.difference(originalConfiguration.toMap(), configuration.toMap());
+        diff.entriesOnlyOnRight()

Review comment:
       This doesn't seem correct, it only covers cases where we add new options to the config. I think we need to cover all three cases:
   
   1) Config option has been removed (left side only)
   2) Config option has changed (diff)
   3) Config option has been added (right side)

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -169,12 +233,53 @@ public static void setAsContext(
                             mergedConfiguration,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
-                            suppressSysout);
+                            suppressSysout,
+                            allowConfigurations,
+                            errors);
                 };
         initializeContextEnvironment(factory);
     }
 
     public static void unsetAsContext() {
         resetContextEnvironment();
     }
+
+    private List<String> collectNotAllowedConfigurations() {
+        final List<String> errors = new ArrayList<>();
+        if (allowConfigurations) {
+            return errors;
+        }
+        final MapDifference<String, String> diff =
+                Maps.difference(originalConfiguration.toMap(), configuration.toMap());
+        diff.entriesOnlyOnRight()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationKeyAndValue(
+                                                k, v)));
+
+        if (!Arrays.equals(originalCheckpointConfigSerialized, serializeConfig(checkpointCfg))) {
+            errors.add(
+                    ConfigurationNotAllowedMessage.ofConfigurationObject(
+                            originalCheckpointConfigSerialized.getClass().getName()));

Review comment:
       nit: Do we need to print the fully qualified class name or would the simple name be enough? Maybe it would make sense to pass the class object here and let `ConfigurationNotAllowedMessage` take care of the formatting.
   ```suggestion
                       ConfigurationNotAllowedMessage.ofConfigurationObject(
                               originalCheckpointConfigSerialized.getClass());
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -169,12 +242,42 @@ public static void setAsContext(
                             mergedConfiguration,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
-                            suppressSysout);
+                            suppressSysout,
+                            allowConfigurations,
+                            errors);
                 };
         initializeContextEnvironment(factory);
     }
 
     public static void unsetAsContext() {
         resetContextEnvironment();
     }
+
+    private List<String> collectNotAllowedConfigurations() {

Review comment:
       Agreed, until we get rid of Execution and Checkpoint config, this would be tricky to achieve.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -169,12 +233,53 @@ public static void setAsContext(
                             mergedConfiguration,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
-                            suppressSysout);
+                            suppressSysout,
+                            allowConfigurations,
+                            errors);
                 };
         initializeContextEnvironment(factory);
     }
 
     public static void unsetAsContext() {
         resetContextEnvironment();
     }
+
+    private List<String> collectNotAllowedConfigurations() {
+        final List<String> errors = new ArrayList<>();
+        if (allowConfigurations) {
+            return errors;
+        }
+        final MapDifference<String, String> diff =
+                Maps.difference(originalConfiguration.toMap(), configuration.toMap());
+        diff.entriesOnlyOnRight()
+                .forEach(
+                        (k, v) ->
+                                errors.add(
+                                        ConfigurationNotAllowedMessage.ofConfigurationKeyAndValue(
+                                                k, v)));
+
+        if (!Arrays.equals(originalCheckpointConfigSerialized, serializeConfig(checkpointCfg))) {

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org