You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/24 06:42:24 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #407: [FLINK-29713] Kubernetes operator should restart failed jobs

gyfora commented on code in PR #407:
URL: https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1002922589


##########
docs/content/docs/custom-resource/job-management.md:
##########
@@ -241,6 +241,21 @@ In order this feature to work one must enable [recovery of missing job deploymen
 At the moment deployment is considered unhealthy when Flink's restarts count reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default: `64`)
 within time window of `kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
 
+## Restart failed job deployments
+
+The operator can restart a failed Flink cluster deployment. This could be useful in cases when the job main task is
+able to reconfigure the job to handle these failures.
+
+For example a job could dynamically create the DAG based on some job configuration which job configuration could
+change over time. When a task detects a record which could not be handled with the current configuration then the task
+should throw a `SuppressRestartsException` to fail the job. If `kubernetes.operator.cluster.restart.failed` is set to 
+`true` (default: `false`) then the operator detects the failed job and restarts it. When the job restarts then it reads
+the new job configuration and creates the new DAG based on this new configuration. The new deployment could handle the
+incoming records and no manual intervention is needed.

Review Comment:
   Let's remove this paragraph, this is very use-case specific.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -339,4 +339,11 @@ public static String operatorConfigKey(String key) {
                     .withDescription(
                             "The threshold which is checked against job restart count within a configured window. "
                                     + "If the restart count is reaching the threshold then full cluster restart is initiated.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> OPERATOR_CLUSTER_RESTART_FAILED =
+            operatorConfig("cluster.restart.failed")

Review Comment:
   this should be:
   
   ```
   job.restart.failed
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -262,26 +264,44 @@ public boolean reconcileOtherChanges(
             return true;
         }
 
+        if (JobStatus.valueOf(deployment.getStatus().getJobStatus().getState()) == JobStatus.FAILED

Review Comment:
   Maybe we could move this logic up one level to the `AbstractJobReconciler` this logic and config makes sense for both Applications and SessionJobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org