You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/19 06:30:51 UTC

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #162: [FLINK-27029] DeploymentValidator should take default flink config into account during validation

wangyang0918 commented on code in PR #162:
URL: https://github.com/apache/flink-kubernetes-operator/pull/162#discussion_r852630974


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -70,8 +76,8 @@ public Optional<String> validateDeployment(FlinkDeployment deployment) {
                         deployment.getMetadata().getName(),
                         deployment.getMetadata().getNamespace()),
                 validateLogConfig(spec.getLogConfiguration()),
-                validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
-                validateJmSpec(spec.getJobManager(), spec.getFlinkConfiguration()),
+                validateJobSpec(spec.getJob(), effectiveConfig),
+                validateJmSpec(spec.getJobManager(), effectiveConfig),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment));

Review Comment:
   We should also use effective config in `validateSpecChange` to get `SAVEPOINT_DIRECTORY`.



##########
flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java:
##########
@@ -84,7 +86,11 @@ public void testHandleValidateRequestWithoutContent() {
     public void testHandleValidateRequestWithAdmissionReview() throws IOException {
         final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
         final FlinkDeployment flinkDeployment = new FlinkDeployment();
-        flinkDeployment.setSpec(new FlinkDeploymentSpec());
+        flinkDeployment.setMetadata(

Review Comment:
   Why we have this change?



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -274,6 +277,29 @@ public void testValidation() {
         testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
     }
 
+    @Test
+    public void testValidationWithDefaultConfig() {

Review Comment:
   We could make `testSuccess` accepts a new created validator with default flink config. Then updating the env will be unnecessary since loading default flink config from env is not what we want to test.
   
   ```
       @Test
       public void testValidationWithDefaultConfig() {
           final Configuration defaultFlinkConfig = new Configuration();
           defaultFlinkConfig.set(
                   HighAvailabilityOptions.HA_MODE,
                   KubernetesHaServicesFactory.class.getCanonicalName());
           final DefaultValidator validatorWithDefaultConfig =
                   new DefaultValidator(defaultFlinkConfig);
           testSuccess(
                   dep -> {
                       dep.getSpec().setFlinkConfiguration(new HashMap<>());
                       dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
                   },
                   validatorWithDefaultConfig);
       }
   ```



##########
flink-kubernetes-operator/src/test/resources/test-validation/flink-conf.yaml:
##########
@@ -0,0 +1,19 @@
+################################################################################

Review Comment:
   After the above changes, this file is useless.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -56,12 +57,17 @@ public class DefaultValidator implements FlinkResourceValidator {
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
 
-    private static Configuration defaultFlinkConf =
+    @VisibleForTesting
+    static Configuration defaultFlinkConf =

Review Comment:
   I think we could make `defaultFlinkConf` as internal private field and introduce two constructor. 
   
   ```
       public DefaultValidator() {
           this(FlinkUtils.loadConfiguration(EnvUtils.get(EnvUtils.ENV_FLINK_CONF_DIR)));
       }
   
       public DefaultValidator(Configuration defaultFlinkConf) {
           this.defaultFlinkConf = defaultFlinkConf;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org