You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/22 07:54:35 UTC

[GitHub] [flink-kubernetes-operator] pvary opened a new pull request, #407: [FLINK-29713] Kubernetes operator should restart failed jobs

pvary opened a new pull request, #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407

   ## What is the purpose of the change
   
   It would be good to have the possibility to restart the Flink Application if it goes to `FAILED` state.
   This could be used to restart, and reconfigure the job dynamically in the application `main` method if the current application can not handle the incoming data.
   
   ## Brief change log
   
     - Creates a new configuration `cluster.restart.failed` with the default `false`
     - If the Application state is `FAILED` and the configuration is set to `true` then the application is removed and redeployed
   
   ## Verifying this change
   This change added tests and can be verified as follows:
     - Added tests where the configuration is turned on, and the application is redeployed
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no
     - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] pvary commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1002978995


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -339,4 +339,11 @@ public static String operatorConfigKey(String key) {
                     .withDescription(
                             "The threshold which is checked against job restart count within a configured window. "
                                     + "If the restart count is reaching the threshold then full cluster restart is initiated.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> OPERATOR_CLUSTER_RESTART_FAILED =
+            operatorConfig("cluster.restart.failed")

Review Comment:
   Done



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -262,26 +264,44 @@ public boolean reconcileOtherChanges(
             return true;
         }
 
+        if (JobStatus.valueOf(deployment.getStatus().getJobStatus().getState()) == JobStatus.FAILED

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] jeesmon commented on pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
jeesmon commented on PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#issuecomment-1302013578

   @pvary Can this be configured per FlinkDeployment or is it global config for all Deployments managed by the operator? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1003023775


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -238,8 +240,34 @@ protected void rollback(CR resource, Context<?> ctx, Configuration observeConfig
     @Override
     public boolean reconcileOtherChanges(
             CR resource, Context<?> context, Configuration observeConfig) throws Exception {
-        return SavepointUtils.triggerSavepointIfNeeded(
-                getFlinkService(resource, context), resource, observeConfig);
+        var jobStatus =
+                org.apache.flink.api.common.JobStatus.valueOf(
+                        resource.getStatus().getJobStatus().getState());
+        if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
+                && observeConfig.getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
+            LOG.info("Stopping failed Flink Cluster deployment...");
+            cancelJob(resource, context, UpgradeMode.LAST_STATE, observeConfig);
+            resource.getStatus().setError("");
+            resubmitJmDeployment(resource, context, observeConfig, false);
+            return true;
+        } else {
+            return SavepointUtils.triggerSavepointIfNeeded(
+                    getFlinkService(resource, context), resource, observeConfig);
+        }
+    }
+
+    protected void resubmitJmDeployment(
+            CR deployment, Context<?> ctx, Configuration observeConfig, boolean requireHaMetadata)
+            throws Exception {
+        LOG.info("Resubmitting Flink Cluster deployment...");
+        SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+        restoreJob(
+                deployment,
+                specToRecover,
+                deployment.getStatus(),
+                ctx,
+                observeConfig,
+                requireHaMetadata);

Review Comment:
   Or just rename it to `resubmintJob` and modify the logging accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] pvary commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1002975203


##########
docs/content/docs/custom-resource/job-management.md:
##########
@@ -241,6 +241,21 @@ In order this feature to work one must enable [recovery of missing job deploymen
 At the moment deployment is considered unhealthy when Flink's restarts count reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default: `64`)
 within time window of `kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
 
+## Restart failed job deployments
+
+The operator can restart a failed Flink cluster deployment. This could be useful in cases when the job main task is
+able to reconfigure the job to handle these failures.
+
+For example a job could dynamically create the DAG based on some job configuration which job configuration could
+change over time. When a task detects a record which could not be handled with the current configuration then the task
+should throw a `SuppressRestartsException` to fail the job. If `kubernetes.operator.cluster.restart.failed` is set to 
+`true` (default: `false`) then the operator detects the failed job and restarts it. When the job restarts then it reads
+the new job configuration and creates the new DAG based on this new configuration. The new deployment could handle the
+incoming records and no manual intervention is needed.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] pvary commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1005314178


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -253,4 +281,14 @@ public boolean reconcileOtherChanges(
     protected abstract void cancelJob(
             CR resource, Context<?> ctx, UpgradeMode upgradeMode, Configuration observeConfig)
             throws Exception;
+
+    /**
+     * Removes a failed job.
+     *
+     * @param resource The failed job.
+     * @param observeConfig Observe configuration.
+     * @throws Exception Error during cancellation.
+     */
+    protected abstract void removeFailedJob(
+            CR resource, Context<?> ctx, Configuration observeConfig) throws Exception;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] pvary commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1003055992


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -238,8 +240,34 @@ protected void rollback(CR resource, Context<?> ctx, Configuration observeConfig
     @Override
     public boolean reconcileOtherChanges(
             CR resource, Context<?> context, Configuration observeConfig) throws Exception {
-        return SavepointUtils.triggerSavepointIfNeeded(
-                getFlinkService(resource, context), resource, observeConfig);
+        var jobStatus =
+                org.apache.flink.api.common.JobStatus.valueOf(
+                        resource.getStatus().getJobStatus().getState());
+        if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
+                && observeConfig.getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
+            LOG.info("Stopping failed Flink Cluster deployment...");
+            cancelJob(resource, context, UpgradeMode.LAST_STATE, observeConfig);
+            resource.getStatus().setError("");
+            resubmitJmDeployment(resource, context, observeConfig, false);
+            return true;
+        } else {
+            return SavepointUtils.triggerSavepointIfNeeded(
+                    getFlinkService(resource, context), resource, observeConfig);
+        }
+    }
+
+    protected void resubmitJmDeployment(
+            CR deployment, Context<?> ctx, Configuration observeConfig, boolean requireHaMetadata)
+            throws Exception {
+        LOG.info("Resubmitting Flink Cluster deployment...");
+        SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+        restoreJob(
+                deployment,
+                specToRecover,
+                deployment.getStatus(),
+                ctx,
+                observeConfig,
+                requireHaMetadata);

Review Comment:
   I like this. Renamed to `resubmitJob`, and modified the log message accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1005299938


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -253,4 +281,14 @@ public boolean reconcileOtherChanges(
     protected abstract void cancelJob(
             CR resource, Context<?> ctx, UpgradeMode upgradeMode, Configuration observeConfig)
             throws Exception;
+
+    /**
+     * Removes a failed job.
+     *
+     * @param resource The failed job.
+     * @param observeConfig Observe configuration.
+     * @throws Exception Error during cancellation.
+     */
+    protected abstract void removeFailedJob(
+            CR resource, Context<?> ctx, Configuration observeConfig) throws Exception;

Review Comment:
   I suggest we rename this to `cleanupAfterFailedJob`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] jeesmon commented on pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
jeesmon commented on PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#issuecomment-1302049967

   @gyfora Just to confirm, so just need to add it to the `spec.flinkConfiguration` if I want to enable it just for a specific `FlinkDeployment`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#issuecomment-1302066565

   @jeesmon yes, same as with any other config 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1003022765


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -238,8 +240,34 @@ protected void rollback(CR resource, Context<?> ctx, Configuration observeConfig
     @Override
     public boolean reconcileOtherChanges(
             CR resource, Context<?> context, Configuration observeConfig) throws Exception {
-        return SavepointUtils.triggerSavepointIfNeeded(
-                getFlinkService(resource, context), resource, observeConfig);
+        var jobStatus =
+                org.apache.flink.api.common.JobStatus.valueOf(
+                        resource.getStatus().getJobStatus().getState());
+        if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
+                && observeConfig.getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
+            LOG.info("Stopping failed Flink Cluster deployment...");
+            cancelJob(resource, context, UpgradeMode.LAST_STATE, observeConfig);
+            resource.getStatus().setError("");
+            resubmitJmDeployment(resource, context, observeConfig, false);
+            return true;
+        } else {
+            return SavepointUtils.triggerSavepointIfNeeded(
+                    getFlinkService(resource, context), resource, observeConfig);
+        }
+    }
+
+    protected void resubmitJmDeployment(
+            CR deployment, Context<?> ctx, Configuration observeConfig, boolean requireHaMetadata)
+            throws Exception {
+        LOG.info("Resubmitting Flink Cluster deployment...");
+        SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+        restoreJob(
+                deployment,
+                specToRecover,
+                deployment.getStatus(),
+                ctx,
+                observeConfig,
+                requireHaMetadata);

Review Comment:
   The `resubmitJmDeployment` name is not correct here. Maybe it's better to simply inline the `restoreJob` call in the if branch if it's only called in one place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1002922589


##########
docs/content/docs/custom-resource/job-management.md:
##########
@@ -241,6 +241,21 @@ In order this feature to work one must enable [recovery of missing job deploymen
 At the moment deployment is considered unhealthy when Flink's restarts count reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default: `64`)
 within time window of `kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
 
+## Restart failed job deployments
+
+The operator can restart a failed Flink cluster deployment. This could be useful in cases when the job main task is
+able to reconfigure the job to handle these failures.
+
+For example a job could dynamically create the DAG based on some job configuration which job configuration could
+change over time. When a task detects a record which could not be handled with the current configuration then the task
+should throw a `SuppressRestartsException` to fail the job. If `kubernetes.operator.cluster.restart.failed` is set to 
+`true` (default: `false`) then the operator detects the failed job and restarts it. When the job restarts then it reads
+the new job configuration and creates the new DAG based on this new configuration. The new deployment could handle the
+incoming records and no manual intervention is needed.

Review Comment:
   Let's remove this paragraph, this is very use-case specific.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -339,4 +339,11 @@ public static String operatorConfigKey(String key) {
                     .withDescription(
                             "The threshold which is checked against job restart count within a configured window. "
                                     + "If the restart count is reaching the threshold then full cluster restart is initiated.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> OPERATOR_CLUSTER_RESTART_FAILED =
+            operatorConfig("cluster.restart.failed")

Review Comment:
   this should be:
   
   ```
   job.restart.failed
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -262,26 +264,44 @@ public boolean reconcileOtherChanges(
             return true;
         }
 
+        if (JobStatus.valueOf(deployment.getStatus().getJobStatus().getState()) == JobStatus.FAILED

Review Comment:
   Maybe we could move this logic up one level to the `AbstractJobReconciler` this logic and config makes sense for both Applications and SessionJobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#issuecomment-1302024161

   > @pvary Can this be configured per FlinkDeployment or is it global config for all Deployments managed by the operator? Thanks!
   
   @jeesmon this can be configured on a per CR basis (it is in dynamic section, thats the hint)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org