You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/06 10:13:42 UTC

[GitHub] [flink-kubernetes-operator] gaborgsomogyi opened a new pull request, #394: [FLINK-29394] Add observe Flink job health

gaborgsomogyi opened a new pull request, #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394

   ## What is the purpose of the change
   
   Flink has its own restart strategies which are working fine. But there are certain circumstances when Flink can stuck in a restart loop. A good example is when checkpointing is activated and the restart strategy has not been configured, then the fixed-delay strategy is used with `Integer.MAX_VALUE` restart attempts. When the JobManager (JM from now on) is able to solve its temporary issue, it can be that a permanent issue appears on TaskManager (TM from now on) side. A good example is that TM has a memory leak and just crashes. Such case the Flink job requires a restart from the outside, which can be done by the Flink k8s operator.
   
   In this PR I've added job health check feature. Please be aware that the implementation is simple and has the following caveats:
   * Restart count is watched in a normal non-sliding window
   * When the last valid observed health info timestamp is outside of the watched window then the algorithm assumes even restart count distribution
   
   ## Brief change log
   
   * Added `kubernetes.operator.job.health-check.enabled` config (default: false)
   * Added `kubernetes.operator.job.health-check.duration-window` config (default: 2 minutes)
   * Added `kubernetes.operator.job.health-check.threshold` config (default: 64)
   * Added `JobHealthInfo` field to `JobStatus` as string field (internal structure may change suddenly at this stage)
   * Added `JobHealthObserver` which is responsible to fetch job health information from the submitted job
   * Added `JobHealthChecker` which is responsible to decide whether the job is healthy or not
   * Added job restart functionality (with the same spec) when job considered unhealthy
   * Added several unit tests
   * Some simplifications/refactors
   
   ## Verifying this change
   
   * Existing unit tests
   * Additional unit tests
   * Manually with my newly created [chaos monkey](https://github.com/gaborgsomogyi/flink-chaos-monkey-java-job) job
     * Job submitted with health check
     * Executed in the TM shell: `touch /tmp/throwExceptionInUDF`
     * Waited for job recovery
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: yes, new configs added
     - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? newly added configs documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1269772358

   cc @gyfora @morhidi @mbalassi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1274339184

   Just renamed all params from `...job...` to `...cluster...`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r994249637


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -32,15 +33,21 @@
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+
 /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
     private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus> savepointObserver;
     private final JobStatusObserver<FlinkDeployment, ApplicationObserverContext> jobStatusObserver;
 
+    @Nullable private ClusterHealthObserver clusterHealthObserver;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r992133413


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -272,6 +286,46 @@ private void recoverJmDeployment(
         restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
+    private boolean shouldRestartJobBecauseUnhealthy(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (jobHealthChecker == null) {
+                jobHealthChecker = new JobHealthChecker(Clock.systemDefaultZone());
+            }
+        } else {
+            if (jobHealthChecker != null) {
+                jobHealthChecker = null;
+            }
+        }
+
+        boolean result = false;
+
+        if (jobHealthChecker != null) {
+            final String clusterInfoKey = JobHealthInfo.class.getSimpleName();
+            if (deployment.getStatus().getClusterInfo().containsKey(clusterInfoKey)) {
+                LOG.debug("Cluster info contains job health info");
+                if (!jobHealthChecker.isJobHealthy(

Review Comment:
   I think there are all kinds of concurrency issues here. I think your code assumes that there is one reconciler per resource, but the reconciler is used concurrently from multiple thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1276402549

   Updated PR description to reflect latest code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991269499


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -80,6 +80,24 @@
             <td>Boolean</td>
             <td>Whether to enable recovery of missing/deleted jobmanager deployments.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.health-check.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable health check for jobs.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.restart-check.duration-window</h5></td>
+            <td style="word-wrap: break-word;">2 min</td>
+            <td>Duration</td>
+            <td>The duration of the time window where job restart count measured.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.restart-check.threshold</h5></td>

Review Comment:
   Good point, renamed to:
   ```
   kubernetes.operator.job.health-check.enabled
   kubernetes.operator.job.health-check.restarts.window
   kubernetes.operator.job.health-check.restarts.threshold
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991284849


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -251,6 +261,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
 | savepointInfo | org.apache.flink.kubernetes.operator.crd.status.SavepointInfo | Information about pending and last savepoint for the job. |
+| jobHealthInfo | java.lang.String | Information about job health. |

Review Comment:
   Well we added this so technically we are free to add anything to it. Currently contains version info but we might as well put the health info there too. 
   
   This approach has 0 risk if we later want to change/remove the health info. Whereas if you introduce a new field to the CRD that is there to stay forever due to backward compatibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991399660


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -251,6 +261,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
 | savepointInfo | org.apache.flink.kubernetes.operator.crd.status.SavepointInfo | Information about pending and last savepoint for the job. |
+| jobHealthInfo | java.lang.String | Information about job health. |

Review Comment:
   OK, moved the health info to the `clusterInfo` map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r994209615


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -32,15 +33,21 @@
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+
 /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
     private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus> savepointObserver;
     private final JobStatusObserver<FlinkDeployment, ApplicationObserverContext> jobStatusObserver;
 
+    @Nullable private ClusterHealthObserver clusterHealthObserver;

Review Comment:
   Not `@Nullable` 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+
+/** Represents information about job health. */
+@Experimental
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class ClusterHealthInfo {

Review Comment:
   ```
   @Data
   @NoArgsConstructor
   ```
   Should be enough, the rest comes from Data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1273478676

   From high level perspective now we fill `clusterInfo` from multiple places but if we want to use it as custom and flexible storage then we need to go this direction instead of having a single `flinkService.getClusterInfo`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r994261269


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+
+/** Represents information about job health. */
+@Experimental
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class ClusterHealthInfo {

Review Comment:
   `@ToString` dropped, the printout is the same.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991261118


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -225,6 +225,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
 | reconciliationStatus | org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
 
+### JobHealthInfo
+**Class**: org.apache.flink.kubernetes.operator.crd.status.JobHealthInfo

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r992148912


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -315,4 +315,28 @@ public static String operatorConfigKey(String key) {
                     .intType()
                     .defaultValue(8085)
                     .withDescription("The port the health probe will use to expose the status.");
+
+    @Documentation.Section(SECTION_SYSTEM)

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r992135797


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobHealthChecker.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.health.JobHealthInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+
+/** Evaluates whether the job is healthy. */
+public class JobHealthChecker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobHealthChecker.class);
+
+    private final Clock clock;
+    @VisibleForTesting @Nullable JobHealthInfo lastValidJobHealthInfo;
+
+    public JobHealthChecker(Clock clock) {
+        this.clock = clock;
+    }
+
+    @VisibleForTesting
+    void setLastValidJobHealthInfo(JobHealthInfo lastValidJobHealthInfo) {
+        LOG.debug("Setting last valid health check info");
+        this.lastValidJobHealthInfo = lastValidJobHealthInfo;

Review Comment:
   If the isHealthy logic depends only on the last health info, it might be best to compute it at the time when you observe the new one but before you put it in the status. At that point you have the previous and new health info



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobHealthChecker.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.health.JobHealthInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+
+/** Evaluates whether the job is healthy. */
+public class JobHealthChecker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobHealthChecker.class);
+
+    private final Clock clock;
+    @VisibleForTesting @Nullable JobHealthInfo lastValidJobHealthInfo;
+
+    public JobHealthChecker(Clock clock) {
+        this.clock = clock;
+    }
+
+    @VisibleForTesting
+    void setLastValidJobHealthInfo(JobHealthInfo lastValidJobHealthInfo) {
+        LOG.debug("Setting last valid health check info");
+        this.lastValidJobHealthInfo = lastValidJobHealthInfo;

Review Comment:
   This way you would also put the isHealthy info in the status and the reconciler would not need to compute anything



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991457409


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -139,8 +135,6 @@ protected void observeJmDeployment(
             return;
         }
 
-        flinkApp.getStatus().setClusterInfo(new HashMap<>());

Review Comment:
   This was blocking the reconciler to get valid health info in case of the cluster was in restarting state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r994261269


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+
+/** Represents information about job health. */
+@Experimental
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class ClusterHealthInfo {

Review Comment:
   `@ToString` dropped, the printout is the same.
   `@AllArgsConstructor` is must because otherwise `new ClusterHealthInfo(clock.millis(), numRestarts, true);` fails to compile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r993616745


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobHealthChecker.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.health.JobHealthInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+
+/** Evaluates whether the job is healthy. */
+public class JobHealthChecker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobHealthChecker.class);
+
+    private final Clock clock;
+    @VisibleForTesting @Nullable JobHealthInfo lastValidJobHealthInfo;
+
+    public JobHealthChecker(Clock clock) {
+        this.clock = clock;
+    }
+
+    @VisibleForTesting
+    void setLastValidJobHealthInfo(JobHealthInfo lastValidJobHealthInfo) {
+        LOG.debug("Setting last valid health check info");
+        this.lastValidJobHealthInfo = lastValidJobHealthInfo;

Review Comment:
   Refactored fully.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -272,6 +286,46 @@ private void recoverJmDeployment(
         restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
+    private boolean shouldRestartJobBecauseUnhealthy(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (jobHealthChecker == null) {
+                jobHealthChecker = new JobHealthChecker(Clock.systemDefaultZone());
+            }
+        } else {
+            if (jobHealthChecker != null) {
+                jobHealthChecker = null;
+            }
+        }
+
+        boolean result = false;
+
+        if (jobHealthChecker != null) {
+            final String clusterInfoKey = JobHealthInfo.class.getSimpleName();
+            if (deployment.getStatus().getClusterInfo().containsKey(clusterInfoKey)) {
+                LOG.debug("Cluster info contains job health info");
+                if (!jobHealthChecker.isJobHealthy(

Review Comment:
   Refactored fully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r990795976


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -251,6 +261,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
 | savepointInfo | org.apache.flink.kubernetes.operator.crd.status.SavepointInfo | Information about pending and last savepoint for the job. |
+| jobHealthInfo | java.lang.String | Information about job health. |

Review Comment:
   I have been thinking a bit about this, we currently have a `clusterInfo` field in the `FlinkDeploymentStatus` we could simply use that map to store this information. 
   
   I think originally it was designed with some flexibility in mind so we can store different metadata. cc @morhidi 



##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -225,6 +225,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
 | reconciliationStatus | org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
 
+### JobHealthInfo
+**Class**: org.apache.flink.kubernetes.operator.crd.status.JobHealthInfo

Review Comment:
   We should move this class to another package, so that the reference generator does not pick it up. This object is not directly part of the status (only in a serialized form)



##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -80,6 +80,24 @@
             <td>Boolean</td>
             <td>Whether to enable recovery of missing/deleted jobmanager deployments.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.health-check.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable health check for jobs.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.restart-check.duration-window</h5></td>
+            <td style="word-wrap: break-word;">2 min</td>
+            <td>Duration</td>
+            <td>The duration of the time window where job restart count measured.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.restart-check.threshold</h5></td>

Review Comment:
   There is some inconsistency in the naming here, `health-check` vs `restart-check`. 
   Maybe it could be:
   ```
   kubernetes.operator.job.health-check.enabled
   kubernetes.operator.job.health-check.window
   kubernetes.operator.job.health-check.restarts.threshold / kubernetes.operator.job.health-check.num- restarts.threshold
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1277476839

   Added doc + controller test
   <img width="903" alt="Screen Shot 2022-10-13 at 13 40 57" src="https://user-images.githubusercontent.com/18561820/195587001-4d37c607-6b22-472e-917c-0b8c56b453ee.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1276398075

   I've retested it manually and works fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r993617128


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -272,6 +286,46 @@ private void recoverJmDeployment(
         restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
+    private boolean shouldRestartJobBecauseUnhealthy(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (jobHealthChecker == null) {
+                jobHealthChecker = new JobHealthChecker(Clock.systemDefaultZone());
+            }
+        } else {
+            if (jobHealthChecker != null) {
+                jobHealthChecker = null;
+            }
+        }

Review Comment:
   Refactored fully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r992133413


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -272,6 +286,46 @@ private void recoverJmDeployment(
         restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
+    private boolean shouldRestartJobBecauseUnhealthy(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (jobHealthChecker == null) {
+                jobHealthChecker = new JobHealthChecker(Clock.systemDefaultZone());
+            }
+        } else {
+            if (jobHealthChecker != null) {
+                jobHealthChecker = null;
+            }
+        }
+
+        boolean result = false;
+
+        if (jobHealthChecker != null) {
+            final String clusterInfoKey = JobHealthInfo.class.getSimpleName();
+            if (deployment.getStatus().getClusterInfo().containsKey(clusterInfoKey)) {
+                LOG.debug("Cluster info contains job health info");
+                if (!jobHealthChecker.isJobHealthy(

Review Comment:
   I think there are all kinds of concurrency issues here. I think your code assumes that there is a reconciler per resource, but the reconciler is used concurrently from multiple thread.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -315,4 +315,28 @@ public static String operatorConfigKey(String key) {
                     .intType()
                     .defaultValue(8085)
                     .withDescription("The port the health probe will use to expose the status.");
+
+    @Documentation.Section(SECTION_SYSTEM)

Review Comment:
   I think these configs should be in `SECTION_DYNAMIC` as it can be defined on a per resource layer by the user.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -272,6 +286,46 @@ private void recoverJmDeployment(
         restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
+    private boolean shouldRestartJobBecauseUnhealthy(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (jobHealthChecker == null) {
+                jobHealthChecker = new JobHealthChecker(Clock.systemDefaultZone());
+            }
+        } else {
+            if (jobHealthChecker != null) {
+                jobHealthChecker = null;
+            }
+        }

Review Comment:
   It doesn't look correct to manipulate fields based on resource specific configs like health check enabled/or not 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991401325


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -116,13 +116,10 @@ public void observe(FlinkDeployment flinkApp, Context<?> context) {
     }
 
     private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
-        if (!flinkApp.getStatus().getClusterInfo().isEmpty()) {

Review Comment:
   Important change that now we fetch `ClusterInfo` all the time (now `ClusterInfo` contains other things like healt info).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991402891


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -116,13 +116,10 @@ public void observe(FlinkDeployment flinkApp, Context<?> context) {
     }
 
     private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
-        if (!flinkApp.getStatus().getClusterInfo().isEmpty()) {
-            return;
-        }
         try {
             Map<String, String> clusterInfo = flinkService.getClusterInfo(configuration);
-            flinkApp.getStatus().setClusterInfo(clusterInfo);
-            logger.debug("ClusterInfo: {}", clusterInfo);
+            flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);

Review Comment:
   Important change that now we merge fetched info in order to keep previously set health info or other upcoming things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#discussion_r991281075


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -251,6 +261,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
 | savepointInfo | org.apache.flink.kubernetes.operator.crd.status.SavepointInfo | Information about pending and last savepoint for the job. |
+| jobHealthInfo | java.lang.String | Information about job health. |

Review Comment:
   That's a key-value store which is designed to store config info:
   ```
       /** Config information from running clusters. */
       private Map<String, String> clusterInfo = new HashMap<>();
   ```
   For example one key is `DashboardConfiguration.FIELD_NAME_FLINK_VERSION`.
   I can put it there but at the first glance it's not intended to store structures like this.
   Are we sure that we want to do that?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on pull request #394: [FLINK-29394] Add observe Flink job health

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #394:
URL: https://github.com/apache/flink-kubernetes-operator/pull/394#issuecomment-1269750124

   Example operator log attached:
   [job_health.log](https://github.com/apache/flink-kubernetes-operator/files/9723875/job_health.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org