You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/29 16:48:31 UTC

[GitHub] [flink-kubernetes-operator] bgeng777 opened a new pull request #131: [FLINK-26892] Observe current status before validating CR changes

bgeng777 opened a new pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131


   - Adjust current logic in `FlinkDeploymentController` from `validate with new config -> observe with new config -> reconcile with new config `to `observe with latest validated config -> validate new config -> reconcile with new config` to get rid of the case that once a wrong config is applied without webhook, the controller will get stuck on the validation error and lose control of the previous running deployment.
   - Refactor `FlinkConfigBuilder` to add support of building config with metadata and deployment spec.
   - Add corresponding test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838510223



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,31 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration lastValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());

Review comment:
       I think it also fits better into the Observer because using the correct config is actually part of the observation logic 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838740146



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,31 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration lastValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());

Review comment:
       Moving the check and generation of `lastValidatedConfig` makes sense. It follows the pattern in our `reconciler#reconcile()` and makes codes much cleaner. 
   Refactored.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838210350



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,30 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration latestValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());
+            try {
+                observerFactory
+                        .getOrCreate(flinkApp)
+                        .observe(flinkApp, context, latestValidatedConfig);
+            } catch (DeploymentFailedException dfe) {
+                handleDeploymentFailed(flinkApp, dfe);
+                LOG.info("Reconciliation successfully completed");

Review comment:
       Oh, I ignored it

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
##########
@@ -50,13 +51,20 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this.meta = deploy.getMetadata();
+        this.spec = deploy.getSpec();
+        this.effectiveConfig = new Configuration(flinkConfig);
+    }
+
+    public FlinkConfigBuilder(

Review comment:
       Get it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1084260784


   @gyfora thanks for your advice. PTAK at the latest commit, which does 2 things:
   1. Add a new parameter in the constructor of `BaseObserver` and remove the `effectiveConfig` parameter in the `Observer#observe()` interface. I did not directly use `DefaultConfig` class as it would introduce extra construction of `FlinkOperatorConfiguration`. Besides, `operatorConfiguration` and `flinkConfig` are for different purposes and such modification can increase the readability.
   2. Follow above discusstion to factoring out common codes to the `BaseObserver` and adjust tests.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840351874



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +402,59 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check the exception
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        assertDoesNotThrow(

Review comment:
       I think the test will also fail without `assertDoesNotThrow`. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838195696



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
##########
@@ -50,13 +51,20 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this.meta = deploy.getMetadata();

Review comment:
       Nice catch. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840323394



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +402,59 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check the exception
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        assertDoesNotThrow(

Review comment:
       It is for @gyfora's latest [comment](https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839643933) to make sure  we first do validation and then get the effective config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1082625097


   cc @gyfora @wangyang0918 @Aitozi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838511641



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,31 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration lastValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());
+            try {
+                observerFactory
+                        .getOrCreate(flinkApp)
+                        .observe(flinkApp, context, lastValidatedConfig);
+            } catch (DeploymentFailedException dfe) {

Review comment:
       We could also keep everything inside the original try-catch block and just do the validation there also. Maybe that will make it look nicer :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839642805



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       You changed the order of validation and getting the effective config. You first need to validate 

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       It would also be good to add a simple test to guard this by submitting a a config in the controllertest that would cause an exception in the getEffectiveConfig method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838202149



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
##########
@@ -50,13 +51,20 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this.meta = deploy.getMetadata();
+        this.spec = deploy.getSpec();
+        this.effectiveConfig = new Configuration(flinkConfig);
+    }
+
+    public FlinkConfigBuilder(

Review comment:
       I considered this solution as well. I am not sure if we will use other fields of the meta in the future. One example could be `annotations`: I notice that in our flink's `KubernetesConfigOptions`, there are option like `JOB_MANAGER_ANNOTATIONS` and `TASK_MANAGER_ANNOTATIONS`. If we have plan to support such fields in our cr, we may need to add back the `ObjectMeta`. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1084107999


   @bgeng777 I agree with the suggestion to move the shared logic to the BaseObserver. As long as we don't change the current behaviour I think this is a good time to simplify it as much as we can by factoring out the common bits to the BaseObserver


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840312565



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +402,59 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check the exception
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        assertDoesNotThrow(
+                () -> {
+                    testController.reconcile(appCluster, context);
+                });
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        // Verify the saved rest port in lastReconciledSpec is actually used in observe().
+        // Otherwise, the NaN rest port
+        // would lead to exception
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "NaN");

Review comment:
       I am afraid we still not verify the observer is using last reconciled config. I suggest to do it in the `TestingFlinkService#listJobs`. To achieve that, we could set a consumer of `TestingFlinkService`, just like `TestingClusterClient`.

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +402,59 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check the exception
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        assertDoesNotThrow(

Review comment:
       What's the purpose of this assert?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840381900



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +402,59 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check the exception
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        assertDoesNotThrow(

Review comment:
       Not sure if I understand correctly: you are right that the `assertDoesNotThrow` is redundant here. 
   The idea of this part is that when `setJarURI(null)`, if we get effective config before validation, the test will fail with exception due to `getEffectiveConfig()`. But as long as it can successfully finish, we guarantee that validation is done before `getEffectiveConfig()`. 
   I removed extra `assertDoesNotThrow`. Let me know if that is enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840769013



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
##########
@@ -19,13 +19,33 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
 /** The mode of {@link FlinkDeployment}. */
 public enum Mode {
     APPLICATION,
     SESSION;
 
+    /**
+     * Return the mode of the given FlinkDeployment for Observer and Reconciler. Note, switching
+     * mode for an existing deployment is not allowed.
+     *
+     * @param flinkApp given FlinkDeployment
+     * @return Mode
+     */
     public static Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+        // Try to use lastReconciledSpec if it exists.
+        // The mode derived from last-reconciled spec or current spec should be same.
+        // If they are different, observation phase will use last-reconciled spec and validation
+        // phase will fail.
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        return lastReconciledSpec == null
+                ? getMode(flinkApp.getSpec())
+                : getMode(flinkApp.getSpec());

Review comment:
       hmmm, I made a silly mistake here :(
   After fixing the Mode bug, I was wondering if there was any other spec needed to be changed to last reconciled spec in Observer. When testing with `DeploymentSpec spec = deployment.get().getSpec();` in `observeJmDeployment`, I somehow broke my previous codes and committed the wrong codes.
   It should be fixed by now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840789333



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
##########
@@ -19,13 +19,33 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
 /** The mode of {@link FlinkDeployment}. */
 public enum Mode {
     APPLICATION,
     SESSION;
 
+    /**
+     * Return the mode of the given FlinkDeployment for Observer and Reconciler. Note, switching
+     * mode for an existing deployment is not allowed.
+     *
+     * @param flinkApp given FlinkDeployment
+     * @return Mode
+     */
     public static Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+        // Try to use lastReconciledSpec if it exists.
+        // The mode derived from last-reconciled spec or current spec should be same.
+        // If they are different, observation phase will use last-reconciled spec and validation
+        // phase will fail.
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        return lastReconciledSpec == null
+                ? getMode(flinkApp.getSpec())
+                : getMode(flinkApp.getSpec());

Review comment:
       no worries, we have code review for a reason :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1085490549


   @wangyang0918 @gyfora Would you mind taking another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839764220



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +397,45 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() throws Exception {

Review comment:
       You are right. Current test only verifies the order of `observe()` and `validate()`.
   I enhanced the test to verify the previous validated config is really used by modifying the rest port.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838508190



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,31 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration lastValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());

Review comment:
       Maybe this whole logic could be embedded inside the observer. 
   That would change it to simply `.observe(flinkApp, context)`
   
   It would also simplify the controller loop which is very important




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839383486



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -397,7 +397,45 @@ public void testUpgradeNotReadyCluster() {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() throws Exception {

Review comment:
       This test does not cover that observer should use previous validated config, not the current invalid one. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838131907



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
##########
@@ -50,13 +51,20 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this.meta = deploy.getMetadata();

Review comment:
       this(deploy.getMetadata(), deploy.getSpec(), flinkConfig)

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
##########
@@ -50,13 +51,20 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this.meta = deploy.getMetadata();
+        this.spec = deploy.getSpec();
+        this.effectiveConfig = new Configuration(flinkConfig);
+    }
+
+    public FlinkConfigBuilder(

Review comment:
       Can we directly pass the namespace and clusterId here? It seems we only use these two fields

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,30 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration latestValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());
+            try {
+                observerFactory
+                        .getOrCreate(flinkApp)
+                        .observe(flinkApp, context, latestValidatedConfig);
+            } catch (DeploymentFailedException dfe) {
+                handleDeploymentFailed(flinkApp, dfe);
+                LOG.info("Reconciliation successfully completed");

Review comment:
       This log is not right

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,30 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration latestValidatedConfig =

Review comment:
       nit: maybe called `lastValidatedConfig` better, because it from the `lastReconciledSpec`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 edited a comment on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 edited a comment on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1085896373


   > Looks good, one minor thing we still need to change is in the:
   > 
   > `org.apache.flink.kubernetes.operator.config.Mode` logic.
   > 
   > We need to make sure to get the mode based on the last-reconciled spec if there is one. We do not allow mode switching but this is only enforced in the validation phase.
   
   @gyfora My bad for ignoring the `Mode` when fetching an `Observer` instance. I have updated the `getMode(flinkApp)` to make sure it returns correct mode when last-reconciled spec exists.
   Also add a dedicated test(`verifyReconcileWithAChangedOperatorMode`) to verify the `Mode` is selected correctly for observation when operator mode is changed wrongly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1085896373


   > Looks good, one minor thing we still need to change is in the:
   > 
   > `org.apache.flink.kubernetes.operator.config.Mode` logic.
   > 
   > We need to make sure to get the mode based on the last-reconciled spec if there is one. We do not allow mode switching but this is only enforced in the validation phase.
   
   @gyfora My bad for ignoring the `Mode` when fetching an `Observer` instance. I have updated the `getMode(flinkApp)` to make sure it returns correct mode when last-reconciled spec exists.
   Also add a dedicated test to verify the `Mode` is selected correctly for observation when operator mode is changed wrongly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 edited a comment on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 edited a comment on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1084260784


   @gyfora thanks for your advice. PTAL at the latest commit, which does 2 things:
   1. Add a new parameter in the constructor of `BaseObserver` and remove the `effectiveConfig` parameter in the `Observer#observe()` interface. I did not directly use `DefaultConfig` class as it would introduce extra construction of `FlinkOperatorConfiguration`. Besides, `operatorConfiguration` and `flinkConfig` are for different purposes and such modification can increase the readability.
   2. Follow above discusstion to factoring out common codes to the `BaseObserver` and adjust tests.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r840740946



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
##########
@@ -19,13 +19,33 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
 /** The mode of {@link FlinkDeployment}. */
 public enum Mode {
     APPLICATION,
     SESSION;
 
+    /**
+     * Return the mode of the given FlinkDeployment for Observer and Reconciler. Note, switching
+     * mode for an existing deployment is not allowed.
+     *
+     * @param flinkApp given FlinkDeployment
+     * @return Mode
+     */
     public static Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+        // Try to use lastReconciledSpec if it exists.
+        // The mode derived from last-reconciled spec or current spec should be same.
+        // If they are different, observation phase will use last-reconciled spec and validation
+        // phase will fail.
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        return lastReconciledSpec == null
+                ? getMode(flinkApp.getSpec())
+                : getMode(flinkApp.getSpec());

Review comment:
       should this be `getMode(lastReconciledSpec)` here ? Maybe something is wrong with the test you added :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839209471



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
##########
@@ -46,14 +48,24 @@ public JobObserver(
     }
 
     @Override
-    public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+    public void observe(FlinkDeployment flinkApp, Context context, Configuration defaultConfig) {

Review comment:
       I suggest we not pass the defaultConfig to the `observe` method but pass the `DefaultConfig` to the Observer constructor directly. This way we can avoid accidentally using the wrong config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839759152



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       Thanks for the catch. I restored the order and enhanced the test to cover the case when `getEffectiveConfig()` will throw exception if we do not validate() first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839759152



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       Thanks for the catch. I restored the order and enhanced the test to cover the case that `getEffectiveConfig()` will throw exception if we do not validate() first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838198057



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,30 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration latestValidatedConfig =

Review comment:
       Agree. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#issuecomment-1083368542


   @gyfora when following your excellent suggestion, I notice that there are many duplicated lines in different `observe()` methods in our `JobObserver` and `SessionObserver`. The only difference is what to observe when the cluster is ready. It looks like we can abstract some common methods in `BaseObserver` like:
   ```java
   public void observe(FlinkDeployment flinkApp, Context context, Configuration defaultConfig) {
       FlinkDeploymentSpec lastReconciledSpec = ...
       // Nothing has been launched so skip observing
       if (lastReconciledSpec == null) {
           return;
       }
     
       Configuration lastValidatedConfig = ....
       if (!isClusterReady(flinkApp)) {
           observeJmDeployment(flinkApp, context, lastValidatedConfig);
       }
       if (isClusterReady(flinkApp)) {
           observeWhenClusterReady();
       }
       clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
     }
   ```
   And then implement customized `observeWhenClusterReady()` in `JobObserver` and `SessionObserver`. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r838180201



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -104,9 +105,30 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
+        FlinkDeploymentSpec lastReconciledSpec =
+                flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        if (lastReconciledSpec != null) {
+            Configuration latestValidatedConfig =
+                    FlinkUtils.getEffectiveConfig(
+                            flinkApp.getMetadata(),
+                            lastReconciledSpec,
+                            defaultConfig.getFlinkConfig());
+            try {
+                observerFactory
+                        .getOrCreate(flinkApp)
+                        .observe(flinkApp, context, latestValidatedConfig);
+            } catch (DeploymentFailedException dfe) {
+                handleDeploymentFailed(flinkApp, dfe);
+                LOG.info("Reconciliation successfully completed");

Review comment:
       IIUC, when catching a DeploymentFailedException, our current code would log `"Reconciliation successfully completed"`.
   I agree we can improve this log msg.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #131: [FLINK-26892] Observe current status before validating CR changes

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #131:
URL: https://github.com/apache/flink-kubernetes-operator/pull/131


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org