You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/04 13:57:01 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

gyfora opened a new pull request, #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195

   This PR contains the following features improvements:
   
   - Add general ability to recover MISSING JobManager deployments (that were deleted by the user or otherwise disappeared )
   - Improve Job and JobManager deployment state tracking
   - Disable application cluster shutdown for 1.15 in case of failures and savepoint suspend operations
   - Improve test coverage by adding extra guards in TestingFlinkService and using the FlinkService implementation as much as possible
   - Introduce paremerized tests that cover 1.15 cluster shutdown behaviour
   
   TODO:
    - Add further deployment recovery tests
    - Make deployment recovery optional (default false for 1.14 and default true for 1.15)
    - Recover last checkpoint/savepoint information from cluster when trying to suspend a terminal job with savepoint strategy
   
   Follow up work not covered here:
    - Add events for terminal job states for Flink 1.15


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866941979


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866939928


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   ```Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Submission of failed job in case of an application error ('execution.submit-failed-job-on-applica │
   │ tion-error') is not supported in non-HA setups. ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866934861


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   This feature is only enabled when HA is on. setting this config gives an exception otherwise on submission



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866940778


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct
             clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig));
         } catch (Exception e) {
             LOG.error("Exception while listing jobs", e);
-            jobStatus.setState(JOB_STATE_UNKNOWN);
+            jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());

Review Comment:
   There are benefits to using one of the Flink JobStatus.states. We don't really care whether the Flink job is actually reconciling (whatever that means) or the operator is reconciling is the flink job that's why I chose it. The name fits :)
   
   We can change this later if we feel like it. We could also use simply `null` in these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1120146932

   Your explanation makes sense to me. Given that it is the designed behavior, I have no more question now.
   
   I just feels like if the job finished(e.g. batch jobs) successfully or suspended manually, we do not need to have the retained jobmanager deployment. Anyway, the users could delete the CR to trigger the deletion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865820606


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,45 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean shouldRecoverJmDeployment(
+            FlinkDeploymentStatus status, Configuration conf) {
+        boolean enabled =
+                conf.getOptional(
+                                KubernetesOperatorConfigOptions
+                                        .OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                        .orElse(
+                                conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                                .isNewerVersionThan(FlinkVersion.v1_14)
+                                        ? true
+                                        : false);
+
+        return enabled

Review Comment:
   Not in 1.15 as the cluster won't shut down (so the jobmanager deployment is not lost). In flink 1.14 if you enable this it would redeploy the batch job, this is exactly the reason why this is disabled in 1.14 by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865857546


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,45 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean shouldRecoverJmDeployment(
+            FlinkDeploymentStatus status, Configuration conf) {
+        boolean enabled =
+                conf.getOptional(
+                                KubernetesOperatorConfigOptions
+                                        .OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                        .orElse(
+                                conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                                .isNewerVersionThan(FlinkVersion.v1_14)
+                                        ? true
+                                        : false);
+
+        return enabled

Review Comment:
   Get it



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +219,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {

Review Comment:
   Can we distinguish the `onMissingDeployment` cause here (like by judge whether there is an on-going upgrade). If it caused by Operator, we can continue reconcile. Otherwise we can just clean up the `FlinkDeployment` which means an external operation or internal finish/failed happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865857417


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +219,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {

Review Comment:
   Can we distinguish the `onMissingDeployment` cause here (like by judge whether there is an on-going upgrade). If it caused by Operator, we can continue reconcile. Otherwise we can just clean up the `FlinkDeployment` which means an external operation or internal finish/failed happened. This will also solve the problem for job before 1.15 I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119769786

   If you do not mind, I will do some manual tests tomorrow and share the feedback then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865863929


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +219,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {

Review Comment:
   On-going upgrade would never trigger this , when the job is in suspended state with missing deployment we won't try to observe it. 
   
   Even if we trigger there is a problem we should never clean up the deployment, as the user probably needs to manually recover the job. Cleanup is reserved for deletions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866939928


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   ```Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Submission of failed job in case of an application error ('execution.submit-failed-job-on-application-error') is not supported in non-HA setups. ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119509735

   I tested the basic functionality on my Minikube, feels very K8s native :) Great feature!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866938596


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
                 flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient);
     }
 
+    private void recoverJmDeployment(FlinkDeployment deployment) throws Exception {
+        LOG.info("Missing Flink Cluster deployment, trying to recover...");

Review Comment:
   MDC adds this I guess, or you meant something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865857417


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +219,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {

Review Comment:
   Can we distinguish the `onMissingDeployment` cause here (like by judge whether there is an on-going upgrade). If it caused by Operator, we can continue reconcile. Otherwise we can just clean up the `FlinkDeployment` which means an external operation or internal finish/failed happened. This will also solve the problem for job before 1.14 I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865858737


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,45 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean shouldRecoverJmDeployment(
+            FlinkDeploymentStatus status, Configuration conf) {
+        boolean enabled =
+                conf.getOptional(
+                                KubernetesOperatorConfigOptions
+                                        .OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                        .orElse(
+                                conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                                .isNewerVersionThan(FlinkVersion.v1_14)
+                                        ? true
+                                        : false);
+
+        return enabled

Review Comment:
   Get it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1117602335

   We could add a config option to blacklist FlinkDeployments that the operator should not touch, that would solve this in a more general way, but it still feels a bit strange to do this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119384598

   @wangyang0918 let me know if you have any questions, it's not a trivial change but there are a lot of big improvements here I believe 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866937604


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                        .isNewerVersionThan(FlinkVersion.v1_14)
+                                ? true
+                                : false);
+    }
+
+    public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+        return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+                        == JobState.RUNNING
+                && status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
+    }
+
+    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
+        if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+            return true;
+        }
+
+        String jobState = status.getJobStatus().getState();
+
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();

Review Comment:
   The only LOCALLY terminal state is SUSPENDED. Which we can also use for updates. So I think `isTerminalState` is correct here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866945827


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                        .isNewerVersionThan(FlinkVersion.v1_14)
+                                ? true
+                                : false);
+    }
+
+    public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+        return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+                        == JobState.RUNNING
+                && status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
+    }
+
+    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
+        if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+            return true;
+        }
+
+        String jobState = status.getJobStatus().getState();
+
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();

Review Comment:
   IIUC, the `SUSPENDED` is not a persistent state and it will transit to other states(e.g. `RUNNING`, `FAILED) finally. For example, the JobManager lost leadership and gain back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119777605

   @wangyang0918 of course, I really appreciate it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299875


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,50 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)

Review Comment:
   nit: `? true : false` is unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299797


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);

Review Comment:
   My bad. We might not need to add namespace and name here. MDC add this. We also do not need this for the event. It is duplicated.
   
   ```
   wangyang-pc:flink-kubernetes-operator danrtsey.wy$ kubectl get events | grep 'flinkdeployment/flink-example-statemachine'
   11h         Error     Missing JobManager deployment for default/flink-example-statemachine   flinkdeployment/flink-example-statemachine         Missing JobManager deployment for default/flink-example-statemachine
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1120145624

   @wangyang0918 If I understand correctly, in these cases only the jobmanager deployment remains but the taskmanagers are released.
   The jobmanager deployment will be deleted either on next upgrade or when the user deletes the FlinkDeployment itself.
   
   I think this works quite well, it allows users to go to the Flink cluster, inspect logs, configs, metrics etc. And since the heavy resources are already gone it won't put a strain on the cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865815970


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,45 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean shouldRecoverJmDeployment(
+            FlinkDeploymentStatus status, Configuration conf) {
+        boolean enabled =
+                conf.getOptional(
+                                KubernetesOperatorConfigOptions
+                                        .OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                        .orElse(
+                                conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                                .isNewerVersionThan(FlinkVersion.v1_14)
+                                        ? true
+                                        : false);
+
+        return enabled

Review Comment:
   Will this also trigger a redeployment for a batch job finished ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1117346590

   cc @tweise @Aitozi @wangyang0918 @morhidi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
tweise commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1117586637

   @gyfora one question regarding the motivations to delete a deployment externally: This could be used as escape hatch by users that want to term a misbehaving deployment and have problems removing the CR (which we discussed in the past). I wonder if by default we should or should not try to bring back deployments? And if the default is to try that, what do we advise in the case where a CR update isn't possible (for example due to incompatible CRD update or issue with the operator)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119424745

   nit: Job status successfully updated from RECONCILING to Optional[RUNNING] can you please fix the logging and get the value from the Optional?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866948431


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   I got it. Multiple jobs could be submitted when HA disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866941717


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
                 flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient);
     }
 
+    private void recoverJmDeployment(FlinkDeployment deployment) throws Exception {
+        LOG.info("Missing Flink Cluster deployment, trying to recover...");

Review Comment:
   the name is included in the MDC already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866949459


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct
             clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig));
         } catch (Exception e) {
             LOG.error("Exception while listing jobs", e);
-            jobStatus.setState(JOB_STATE_UNKNOWN);
+            jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());

Review Comment:
   We could keep it as now and change later if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1118240587

   How about adding some retry count as a guard which is cleared when the user changes the CR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r865815970


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,45 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean shouldRecoverJmDeployment(
+            FlinkDeploymentStatus status, Configuration conf) {
+        boolean enabled =
+                conf.getOptional(
+                                KubernetesOperatorConfigOptions
+                                        .OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                        .orElse(
+                                conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                                .isNewerVersionThan(FlinkVersion.v1_14)
+                                        ? true
+                                        : false);
+
+        return enabled

Review Comment:
   Will this also trigger a redeployment after a batch job finished ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1120140585

   I have done some manual tests. And it works well for recover the missing deployment. I have last one question which need to be discussed more.
   
   When `SHUTDOWN_ON_APPLICATION_FINISH` is configured to false, then it is the operator's responsibility to clean up the K8s resources. However, I do not find such logics. It means that if the job reached a globally terminal state(e.g. failed, finished) or suspended manually(changing the CR `.spec.job.state`), we will have the residual K8s resources. In my opinion, the operator needs to do the clean-up after fetching the job status and store it into the CR status successfully.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1118364736

   @tweise 
   After discussing this in detail with @morhidi we came to the following conclusions:
   
   If there is a CR incompatibility issue, the operator won't be able to reconcile it anyways so the recovery logic (or anything else in the controller) won't take effect so the user can simply delete the jobmanager deployment.
   
   Otherwise if the user wants to stop the job they simply need to suspend the FlinkDeployment. The operator should do anything in it's power to make sure that the Flink/Cluster state reflects what's in the FlinkDeployment, therefore automatic jm deployment recovery should be the default in 1.15 and later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867148302


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);
+        Event event =
+                DeploymentFailedException.asEvent(
+                        new DeploymentFailedException(
+                                DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+                        deployment);
+        kubernetesClient

Review Comment:
   I am not really sure how to fix this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1117600634

   @tweise hm, thats a good question. 
   
   In general my logic for making the default behaviour to recover (at least from Flink 1.15 onward), is this is how operators usually work. If you delete a resource managed by an operator it is usually brought back up for you. I would consider the jobmanager deployment a managed resource so we should try to do everything to recover if possible.
   
   The only reason I made this default off in Flink 1.14 is because we cannot keep the clusters around after the job terminated which produces a bunch of annoying cases for failed/finished jobs.
   
   As for what to do if your FlinkDeployment CR gets stuck and you cannot remove it, I don't really know :/ 
   We should make sure that we do not introduce breaking changes 😄 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866602275


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct
             clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig));
         } catch (Exception e) {
             LOG.error("Exception while listing jobs", e);
-            jobStatus.setState(JOB_STATE_UNKNOWN);
+            jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());

Review Comment:
   I still hesitate to replace the `UNKNOWN` with `RECONCILING` since we could not differentiate whether the job is really reconciling or just failing to get the job status.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
                 flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient);
     }
 
+    private void recoverJmDeployment(FlinkDeployment deployment) throws Exception {
+        LOG.info("Missing Flink Cluster deployment, trying to recover...");

Review Comment:
   We might also need to print the FlinkDeployment name here.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   Why do we not need to submit a failed job when HA disabled?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);
+        Event event =
+                DeploymentFailedException.asEvent(
+                        new DeploymentFailedException(
+                                DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+                        deployment);
+        kubernetesClient

Review Comment:
   The age of the missing deloyment event is unknown.
   
   ```
   Events:
     Type     Reason                         Age        From                  Message
     ----     ------                         ----       ----                  -------
     Error    Missing JobManager deployment  <unknown>  JobManagerDeployment  Missing JobManager deployment
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);

Review Comment:
   It will be great if we also print the missing FlinkDeployment name.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                        .isNewerVersionThan(FlinkVersion.v1_14)
+                                ? true
+                                : false);
+    }
+
+    public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+        return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+                        == JobState.RUNNING
+                && status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
+    }
+
+    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
+        if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+            return true;
+        }
+
+        String jobState = status.getJobStatus().getState();
+
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();

Review Comment:
   I am afraid we might need to use `isGloballyTerminalState` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866938313


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);

Review Comment:
   MDC adds this I guess, or you meant something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866948228


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                        .isNewerVersionThan(FlinkVersion.v1_14)
+                                ? true
+                                : false);
+    }
+
+    public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+        return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+                        == JobState.RUNNING
+                && status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
+    }
+
+    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
+        if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+            return true;
+        }
+
+        String jobState = status.getJobStatus().getState();
+
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();

Review Comment:
   Hm I see, I wasn't sure about that. I can change to globallyTerminal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299527


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);
+        Event event =
+                DeploymentFailedException.asEvent(
+                        new DeploymentFailedException(
+                                DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+                        deployment);
+        kubernetesClient

Review Comment:
   I think it is due to `lastTransitionTime` and `lastUpdateTime` is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867317503


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
                 && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);
+        Event event =
+                DeploymentFailedException.asEvent(
+                        new DeploymentFailedException(
+                                DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+                        deployment);
+        kubernetesClient

Review Comment:
   Pushed a fix for this:
   <img width="608" alt="image" src="https://user-images.githubusercontent.com/5880972/167243744-4aa470d0-03ab-4366-9d7d-a571fecf88ae.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #195:
URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119380750

   I am getting to this PR and try to give it a pass very soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org