You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/09 10:48:40 UTC

[GitHub] [flink-kubernetes-operator] SteNicholas opened a new pull request, #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

SteNicholas opened a new pull request, #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162

   `DefaultDeploymentValidator` only takes the `FlinkDeployment` object into account and should validate the effective Flink configs. However in places where we validate the presence of config keys we should also consider the default Flink config which might already provide default values for the required configs even if the deployment itself doesnt.
   
   **The brief change log**
   
   - `DefaultDeploymentValidator` adds the validation for the effective Flink configs which merge the default config and user conf.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r851798624


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -58,6 +58,8 @@ public class DefaultValidator implements FlinkResourceValidator {
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig =
+                FlinkUtils.getFlinkEffectiveConfig(deployment).toMap();

Review Comment:
   I think this is still incorrect from a validation perspective ,we should simply make a copy from the default config and put all values.:
   
   ```
   Configuration conf = new Configuration(defaultConf)
   conf.putAll(spec.getFlinkConfiguration)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#issuecomment-1101179135

   @SteNicholas you can also reuse the defaultConfig field I added to the validator with the latest commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852719363


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =
             FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR));
 
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig = defaultFlinkConf.toMap();
+        if (spec.getFlinkConfiguration() != null) {
+            effectiveConfig.putAll(spec.getFlinkConfiguration());
+        }
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),

Review Comment:
   I agree with your claim of dividing configs into default config and user-defined config.
   My point is that `validateFlinkConfig` actually is to verify if forbidden configs like `KubernetesConfigOptions.NAMESPACE` and `KubernetesConfigOptions.CLUSTER_ID` is defined. I think such validation should be applied to default config as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852630974


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -70,8 +76,8 @@ public Optional<String> validateDeployment(FlinkDeployment deployment) {
                         deployment.getMetadata().getName(),
                         deployment.getMetadata().getNamespace()),
                 validateLogConfig(spec.getLogConfiguration()),
-                validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
-                validateJmSpec(spec.getJobManager(), spec.getFlinkConfiguration()),
+                validateJobSpec(spec.getJob(), effectiveConfig),
+                validateJmSpec(spec.getJobManager(), effectiveConfig),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment));

Review Comment:
   We should also use effective config in `validateSpecChange` to get `SAVEPOINT_DIRECTORY`.



##########
flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java:
##########
@@ -84,7 +86,11 @@ public void testHandleValidateRequestWithoutContent() {
     public void testHandleValidateRequestWithAdmissionReview() throws IOException {
         final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
         final FlinkDeployment flinkDeployment = new FlinkDeployment();
-        flinkDeployment.setSpec(new FlinkDeploymentSpec());
+        flinkDeployment.setMetadata(

Review Comment:
   Why we have this change?



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -274,6 +277,29 @@ public void testValidation() {
         testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
     }
 
+    @Test
+    public void testValidationWithDefaultConfig() {

Review Comment:
   We could make `testSuccess` accepts a new created validator with default flink config. Then updating the env will be unnecessary since loading default flink config from env is not what we want to test.
   
   ```
       @Test
       public void testValidationWithDefaultConfig() {
           final Configuration defaultFlinkConfig = new Configuration();
           defaultFlinkConfig.set(
                   HighAvailabilityOptions.HA_MODE,
                   KubernetesHaServicesFactory.class.getCanonicalName());
           final DefaultValidator validatorWithDefaultConfig =
                   new DefaultValidator(defaultFlinkConfig);
           testSuccess(
                   dep -> {
                       dep.getSpec().setFlinkConfiguration(new HashMap<>());
                       dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
                   },
                   validatorWithDefaultConfig);
       }
   ```



##########
flink-kubernetes-operator/src/test/resources/test-validation/flink-conf.yaml:
##########
@@ -0,0 +1,19 @@
+################################################################################

Review Comment:
   After the above changes, this file is useless.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =

Review Comment:
   I think we could make `defaultFlinkConf` as internal private field and introduce two constructor. 
   
   ```
       public DefaultValidator() {
           this(FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR)));
       }
   
       public DefaultValidator(Configuration defaultFlinkConf) {
           this.defaultFlinkConf = defaultFlinkConf;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852713585


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =
             FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR));
 
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig = defaultFlinkConf.toMap();
+        if (spec.getFlinkConfiguration() != null) {
+            effectiveConfig.putAll(spec.getFlinkConfiguration());
+        }
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),

Review Comment:
   @bgeng777, I don't think `spec.getFlinkConfiguration()` should be replaced with `effectiveConfig`. The validation of Flink config validates the user-defined configuration. Validating the `effectiveConfig` which merges the default config and user-defined config is unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r846826594


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java:
##########
@@ -241,7 +244,9 @@ private static void setResource(
                     isJM
                             ? KubernetesConfigOptions.JOB_MANAGER_CPU
                             : KubernetesConfigOptions.TASK_MANAGER_CPU;
-            effectiveConfig.setString(memoryConfigOption.key(), resource.getMemory());
+            if (!StringUtils.isNullOrWhitespaceOnly(resource.getMemory())) {

Review Comment:
   We should not allow empty settings, this should be validated instead



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java:
##########
@@ -184,8 +184,11 @@ public FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
         if (spec.getJob() != null) {
             effectiveConfig.set(
                     DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
-            final URI uri = new URI(spec.getJob().getJarURI());
-            effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
+            if (spec.getJob().getJarURI() != null) {
+                effectiveConfig.set(
+                        PipelineOptions.JARS,
+                        Collections.singletonList(new URI(spec.getJob().getJarURI()).toString()));
+            }

Review Comment:
   What's the point of this change?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -58,6 +58,7 @@
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig = FlinkUtils.getEffectiveConfig(deployment).toMap();

Review Comment:
   I think we cannot get the `getEffectiveConfig` here as that relies on the FlinkConfigBuilder logic which in turn relies on the validation. 
   
   We should imply get the defaultConfiguration I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r851798061


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##########
@@ -61,6 +61,18 @@ public static DefaultConfig loadDefaultConfig() {
         return new DefaultConfig(operatorConfig, flinkConfig);
     }
 
+    public static Configuration getFlinkEffectiveConfig(FlinkDeployment flinkApp) {
+        try {
+            final Configuration effectiveConfig =
+                    FlinkConfigBuilder.buildFlinkFrom(
+                            flinkApp, loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR)));

Review Comment:
   We should avoid loading the configuration again and again. Best would be to only do it once here or in the validtor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852719363


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =
             FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR));
 
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig = defaultFlinkConf.toMap();
+        if (spec.getFlinkConfiguration() != null) {
+            effectiveConfig.putAll(spec.getFlinkConfiguration());
+        }
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),

Review Comment:
   I agree with your claim of dividing config into default config and user-defined config.
   My point is that `validateFlinkConfig` actually is to verify if forbidden configs like `KubernetesConfigOptions.NAMESPACE` and `KubernetesConfigOptions.CLUSTER_ID` is defined. I think such validation should be applied to default config as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852583825


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =
             FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR));
 
     @Override
     public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
+        Map<String, String> effectiveConfig = defaultFlinkConf.toMap();
+        if (spec.getFlinkConfiguration() != null) {
+            effectiveConfig.putAll(spec.getFlinkConfiguration());
+        }
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),

Review Comment:
   It looks like `spec.getFlinkConfiguration()` here should be replaced with `effectiveConfig` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org