You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/15 04:19:19 UTC

[GitHub] [flink-kubernetes-operator] bgeng777 opened a new pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

bgeng777 opened a new pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62


   - Add `isJobManagerServing()` method to use the rest call(i.e. "/jobs/overview") to check if JM can actually serve requests after JM is deployed.
   - Add the previous check in the `BaseObserver` logic to be executed every time before reconcile when JM state is `READY`.
   - Enhance `SessionObserverTest` and `FlinkDeploymentControllerTest`  to include the case when JM port is ready but the rest service is not available. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827018115



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       @bgeng777, what about using the `clusterClient#getWebInterfaceURL` to check the serving?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827068023



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
##########
@@ -38,7 +38,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
 
         int restApiReadyDelaySeconds =
                 operatorConfig.getInteger(
-                        OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC);
+                        OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC);

Review comment:
       `restApiReadyDelaySeconds` removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827651048



##########
File path: .github/workflows/ci.yml
##########
@@ -52,7 +52,7 @@ jobs:
           export SHELL=/bin/bash
           export DOCKER_BUILDKIT=1
           eval $(minikube -p minikube docker-env)
-          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .

Review comment:
       You are right, they are unrelated. You can check the commit history. It is a hotfix used for helping debug the github CI problem. Yang and I think it may be useful for our further development so I leave it here. Do you have any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826781766



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
##########
@@ -38,7 +38,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
 
         int restApiReadyDelaySeconds =
                 operatorConfig.getInteger(
-                        OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC);
+                        OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC);

Review comment:
       we can simply use `progressCheckIntervalSeconds` and remove `restApiReadyDelaySeconds` from here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829170241



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       I have updated for above changes and enhanced current tests to cover the case when falling to check the jm deployment. 
   One thing I am not so sure is that I reuse the `OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC` for the timeout of `clusterClient.listJobs()`. AFAIK, the rest call is also a kind of 'progress'. Is there any strong point I ignore to introduce a new config for the rest call timeout?
    cc @gyfora @wangyang0918 @tweise 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r828993595



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       This is not really your fault but now the JobObserver won't call observe if the job is already in a ready state (while the sessionobserver always does). I think we need to remove the if logic in the jobObserver and just have your change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829006214



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       Got it. Will add it now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829012183



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       +1 for adding a test to cover this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829532492



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       Since we are already listing jobs when the deployment is ready, do we really need this? I would prefer we avoid redundant calls to the REST API. The previous delay was there to not run into listJobs error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1067558879


   cc @gyfora @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829480545



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       I don't think this is correct and we need to add test coverage. We need to cover this: https://github.com/apache/flink-kubernetes-operator/pull/56/files#diff-1d1ff01af8cbc1561adcc4e62e4f8f4280c7aa8c7b3f0624eb5f27f09aa9e96fR73

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       It might be better to leave this logic in the observer. 

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       Since we are already listing jobs when the deployment is ready, do we really need this? I would prefer we avoid redundant calls to the REST API. The previous delay was there to not run into listJobs error.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       I will try to cover the listJobs error scenario in a separate PR soon.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       @gyfora yes, I will try to cover the listJobs error scenario in a separate PR soon. However, I'm not convinced we should introduce this method here either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829516911



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       @tweise I dont really understand your comment, it seems to refer to changes you introduced in your PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826783907



##########
File path: .github/workflows/ci.yml
##########
@@ -52,7 +52,7 @@ jobs:
           export SHELL=/bin/bash
           export DOCKER_BUILDKIT=1
           eval $(minikube -p minikube docker-env)
-          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest --progress plain .

Review comment:
       We could keep this to make the debugging failed CI easier.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r828779865



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       Hi @tweise , I tried following steps to test the actual behavior of `clusterClient.listJobs()`:
   1. Apply the `basic-session-example` yaml and wait for everything to be fine
   2. Run ` kubectl scale deployment basic-session-example --replicas=0` so JM port and rest call are both unavailable
   3. Rescale the replicas to 1 quickly when the `isJobManagerServing` is waiting.
   I keep monitoring the operator log (I added some local dummy debug info)
   <img width="1339" alt="image" src="https://user-images.githubusercontent.com/80749729/158746332-dff95fec-2701-4b35-b39e-3eb60d647c64.png">
   
   From the log, I find that the rest call is sent at 13:56:05(In the k8s log, it is 05:56:05 due to time lag) and keeps waiting. Then when JM is started at 13:56:07 and everything becomes ok again around 13:56:13, the rest call succeeds. So I believe the answer to "will the list call still fail after waiting for 10s" is that it will not fail due to the retry.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826683535



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +132,26 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.getWebInterfaceURL();
+            URL url =
+                    new URL(
+                            clusterClient.getWebInterfaceURL()
+                                    + JobsOverviewHeaders.getInstance().getTargetRestEndpointURL());
+            // TODO: add support for https if necessary
+            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("GET");
+            connection.connect();
+            if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
+                return true;
+            }

Review comment:
       Thanks for this catch. I neglect that the `clusterClient.listJobs()` will actually send reqesut the same with this raw http blocking call. Have updated to use the clusterClient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1067741866


   
   > I think we do not need `OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC` now. Right?
   
   That's right. IIUC, the reason behind this pr is to ensure the JM can serve rest call before taking other actions. From this point,  `OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC` is redundant as we will do the check every time before the reconcile. But currently, `OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC` is used to set `rescheduleAfterSecfor` when the JM status is `DEPLOYED_NOT_READY`. If we remove `OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC`, do you mean we set a fix value(e.g. 10s) for reschedule interval?
   
   BTW, another thought during the impl: it seems that the 2 states(`JM is launched but port is not ready` and `JM is launched, port is ready but rest service is not ready`) can be merged as what we really care is if the JM can server instead of if the JM port is ready.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829077192



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Thank you @bgeng777 for adding the extra tests. Before merging this I would like to clear up the check flow a little because what you and what @tweise has done conflicts a little.
   
   I think we should not `return` from the observe method if `flinkService.isJobManagerServing(effectiveConfig)` is false. In those cases we should always check the deployment too which happen later down in this method.
   
   This is the whole idea with @tweise 's recent change here: https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java#L74
   
   With your change @bgeng777 we can probably get rid of the observe call there but we still need to validate the deployment if the lest api does not respond.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Also we should use the same **configurable** timeout in both `flinkservice.listJobs` and `flinkservice. isJobManagerServing ` (probably the isJobManagerServing could call listJobs internally) because if the hardcoded 10 seconds fail for any reason the whole logic breaks

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       I think we are pretty close to getting this right, let's cover these few corner cases and we are going to be in a very good shape :)

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       I think we need a new config in the OperatorConfiguraiton for this. We could call it `flinkClientTimeoutSeconds` -> `flink.client.timeout.seconds` 

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       I think we need a new config in the OperatorConfiguraiton for this. We could call it `flinkClientTimeoutSeconds` -> `flink.client.timeout.sec` 

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       @tweise I dont really understand your comment, it seems to refer to changes you introduced in your PR.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       It would be very nice of @bgeng777 to add test coverage to guard the current behaviour but it's not easy to understand all these corner cases that should have been covered already by tests. Let's try to test and review this carefully and we should then take the time to add the missing test coverage for the older features.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       What we could do @bgeng777 is to pass a flag to the JM observer whether to check Rest API for ready jobs. As you explained this should be false for the jobobserver as that checks the rest api anyways. I think for deployed_not_ready state we need to always check to have a good way of telling when it becomes ready.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       Or we could just keep the if check before observing in the job observer like it was before :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829704839



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       What we could do @bgeng777 is to pass a flag to the JM observer whether to check Rest API for ready jobs. As you explained this should be false for the jobobserver as that checks the rest api anyways. I think for deployed_not_ready state we need to always check to have a good way of telling when it becomes ready.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829038732



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       Add tests for `observeApplicationCluster()` to cover the case that JM is ready and later becomes unavailable and finally recovers. Let me know if it is enough.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Thanks a lot for the detailed explanation! @gyfora 
   1. After checking [FLINK-26473](https://issues.apache.org/jira/browse/FLINK-26473) and again, I agree with cancelling the `return` to make it fall back to the deployment. When the rest service is unavailabe, it is reasonable to check the deployment to find if anything bad happens. I will update the logic here.
   
   2. Adding configurable timeout as @tweise and you suggest makes sense to me. As we achieve consensus on setting the same config for both methods, reusing the same logic is absolutely better.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       I have updated for above changes and enhanced current tests to cover the case when falling to check the jm deployment. 
   One thing I am not so sure is that I reuse the `OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC` for the timeout of `clusterClient.listJobs()`. AFAIK, the rest call is also a kind of 'progress'. Is there any strong point I ignore to introduce a new config for the rest call timeout?
    cc @gyfora @wangyang0918 @tweise 
   

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       For JobObserver, I do agree, it is somehow redundant when deployment is ready. For Job, we first `observeJmDeployment`(1 call of `listJobs()`) and then `observeFlinkJobStatus()`(1 or 2 call of `listJobs()`). We could reduce at least 1 extra rest call if we refactor some codes.
   But I think we may still need this check here for SessionObserver. For Session, we will only `observeJmDeployment`. Without this check, we will fall into the deployment check always and in current logic of `main` branch, we would change the JobManagerDeploymentStatus to `DEPLOYED_NOT_READY` as well which is not desired.
   
   

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829083913



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       I think we are pretty close to getting this right, let's cover these few corner cases and we are going to be in a very good shape :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1067746483


   @bgeng777 I think if we remove OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC we can simply use the `progressCheckIntervalSeconds` which would be the same interval as for savepoints/deploying state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 edited a comment on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 edited a comment on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072102050


   Hi @wangyang0918 @gyfora, thanks a lot for your reply.
   It's my bad to not pay enough attention to @tweise's recent [PR](https://github.com/apache/flink-kubernetes-operator/pull/56) for [FLINK-26473](https://issues.apache.org/jira/browse/FLINK-26473) as these 2 PR were developed at the same time. As Yang said, the goal of [FLINK-26605](https://issues.apache.org/jira/browse/FLINK-26605) is to solve 2 problems:
   1. Check if JobManager is really ready for serving when the state is READY.
   2. Solve the case that JobManager crashed backoff.
   I agree that #56 has already solved above problems.
   
   This PR is trying to solve above 2 problems in a somehow different way and leads to confusion. As Gyula's said, this PR should swicth the target to improve the session case. In fact, the session observer will keep switching between READY and DEPLOYED_NOT_READY due to logic in main branch which should be a bug I believe.
   
   So I would like to clarify the goal of this PR will include after above discussion:
   1. Refine the control flow in the `observeJmDeployment` based on current main branch to improve logic of moving from DEPLOYED_NOT_READY to READY to fix the bug of session observer.
   2. Introduce a new config `operator.observer.flink.client.timeout.sec` to control the waiting time of `listJobs`. 
   3. Modify the Dockerfile to output more meaningful message in the CI.
   
   What I would do to rollback extra changes:
   1. Remove `isJobManagerServing` method. Instead, I will keep using `isJobManagerPortReady` as well to detect if JM is ready to work in Session case. The reason is that the timeout solution for a rest call is expensive.
   2. Keep `operator.observer.rest-ready.delay.sec` as it is to follow our current logic of waiting some seconds and set the JM status to READY.
   
   I will finish above changes today. Please let me know if there are unwanted changes or I ignore some cases.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r830146506



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
##########
@@ -35,6 +37,19 @@ public SessionObserver(
 
     @Override
     public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {

Review comment:
       Sorry for the late response. It seems that the current implementation could be optimized by a similar logic with `JobObserver#observe()`. Because the `JobManagerDeploymentStatus` could be set to `READY` in `observeJmDeployment`.
   
   ```
   if (!isClusterReady(flinkApp)) {
         observeJmDeployment(flinkApp, context, effectiveConfig);
   }
   
   if (isClusterReady(flinkApp)) {
           // Check if session cluster can serve rest calls following our practice in JobObserver
           try {
               flinkService.listJobs(effectiveConfig);
           } catch (Exception e) {
               logger.error("REST service in session cluster is bad now", e);
               if (e instanceof TimeoutException) {
                   // check for problems with the underlying deployment
                   observeJmDeployment(flinkApp, context, effectiveConfig);
               }
           }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072384721


   @gyfora Paying much attention for the bug fix before. Now same tests added for SessionObserver :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827092937



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       `getWebInterfaceURL` is neat but one drawback of `clusterClient#getWebInterfaceURL` is that it only returns the current web url(i.e. address of the leader of jm) but does not guarantee the JM can actually work. In `isJobManagerPortReady`, we currently have already used `getWebInterfaceURL` and will 'telnet' the port in the url. In the jira, we think it may not be enough and that's why I finally choose the `listJob` call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827630817



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       Please move the timeout into the config, the previous delay was also configurable. Did you check how the timeout behaves? If REST API is not available when the call is made and then becomes available after 5s, will the list call still fail after waiting for 10s? I believe that's what I had seen when testing this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827649260



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       hi @tweise, I check the flink code and find the underlying impl of `listJob` is [retriable](https://github.com/apache/flink/blob/46bb03848ea67fa7b8952f757d57a2cda6ab16aa/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L833). The retry delay is controlled by the flink config `rest.retry.delay` (default val is 3s). For your case, if the initial call falis, it should retry each 3s until achieving the max retry attempts(default is 20). So do you mean you meet unexpected failure of this list call? Any steps that I can follow to repeat it? 
   
   For the configurable timeout, in fact, I reuse the `listJobs()` in our `FlinkService`. I want to know if you are meaning that we should make both of them wait a configurable timeout. thanks.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r828995148



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       you can probably reproduce the problem by adding a test case to the JobObserverTest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829038732



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       Add tests for `observeApplicationCluster()` to cover the case that JM is ready and later becomes unavailable and finally recovers. Let me know if it is enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826963003



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       The `observeJmDeployment()` method will output log WARN msg when the rest service is not working. 
   Adding it here seems to be redundant?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827629472



##########
File path: .github/workflows/ci.yml
##########
@@ -52,7 +52,7 @@ jobs:
           export SHELL=/bin/bash
           export DOCKER_BUILDKIT=1
           eval $(minikube -p minikube docker-env)
-          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .

Review comment:
       Are these changes unrelated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829480545



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       I don't think this is correct and we need to add test coverage. We need to cover this: https://github.com/apache/flink-kubernetes-operator/pull/56/files#diff-1d1ff01af8cbc1561adcc4e62e4f8f4280c7aa8c7b3f0624eb5f27f09aa9e96fR73




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072046745


   @bgeng777 I think we might need to step back and figure out what we are trying to fix in this PR. After #56, the original problems described in the FLINK-26605 might not be true.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072052494


   I agree @wangyang0918 , the case of failing applications/jobs is well covered now with the recent changes. 
   
   This PR with some modifications could improve the session case and the general logic of moving from deployed_not_ready to ready


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826960040



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       I do agree the try catch solution is not elegant but this pr aims at verifying the REST service of JM.
   For network issue, to my best knowledge, try catch is the most widely used choice. Do you have any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r828999559



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       Thanks for pointing this out. I remove the `observeJmDeployment` call in `observeFlinkJobStatus`. Let me know if that is our expected change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1070698781


   Looks good, I think if you rebase it is good to go :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829520365



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       It would be very nice of @bgeng777 to add test coverage to guard the current behaviour but it's not easy to understand all these corner cases that should have been covered already by tests. Let's try to test and review this carefully and we should then take the time to add the missing test coverage for the older features.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829573671



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       @gyfora yes, I will try to cover the listJobs error scenario in a separate PR soon. However, I'm not convinced we should introduce this method here either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829707634



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       Or we could just keep the if check before observing in the job observer like it was before :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072302363


   sorry for bother you guys again: @gyfora @wangyang0918 @tweise 
   Main changes are only in `SessionObserver`: if the cluster state is READY, then check the rest service via `listJobs` using the same idea with `JobObserver` to make sure it can serve correctly (not only port is ready). if it can not serve or state is not READY, go to check the JM deployment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826622732



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +132,26 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.getWebInterfaceURL();
+            URL url =
+                    new URL(
+                            clusterClient.getWebInterfaceURL()
+                                    + JobsOverviewHeaders.getInstance().getTargetRestEndpointURL());
+            // TODO: add support for https if necessary
+            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("GET");
+            connection.connect();
+            if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
+                return true;
+            }

Review comment:
       Why did you chose to call the rest api this way instead of through the clusterclient directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826824807



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       Does this need to log the exception for listing the jobs of the cluster client?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826786605



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
##########
@@ -38,7 +38,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
 
         int restApiReadyDelaySeconds =
                 operatorConfig.getInteger(
-                        OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC);
+                        OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC);

Review comment:
       I totally agree with your point. I believe `restApiReadyDelaySeconds` field is not necessary any more. I am currently working on fixing the CI issue and will take your advice asap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r828993595



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       This is not really your fault be now the JobObserver won't call observe if the job is already in a ready state (while the sessionobserver always does). I think we need to remove the if logic in the jobObserver and just have your change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829002334



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       don't remove that :) I am talking about:
   https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java#L50
   
   and please add a testcase similar to what you added for the session observer so we can guard against future regressions here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072102050






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829077192



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Thank you @bgeng777 for adding the extra tests. Before merging this I would like to clear up the check flow a little because what you and what @tweise has done conflicts a little.
   
   I think we should not `return` from the observe method if `flinkService.isJobManagerServing(effectiveConfig)` is false. In those cases we should always check the deployment too which happen later down in this method.
   
   This is the whole idea with @tweise 's recent change here: https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java#L74
   
   With your change @bgeng777 we can probably get rid of the observe call there but we still need to validate the deployment if the lest api does not respond.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072102050


   Hi @wangyang0918 @gyfora, thanks a lot for your reply.
   It's my bad to not pay enough attention to @tweise's recent [PR](https://github.com/apache/flink-kubernetes-operator/pull/56) for [FLINK-26473](https://issues.apache.org/jira/browse/FLINK-26473) as these 2 PR were developed at the same time. As Yang said, the goal of [FLINK-26605](https://issues.apache.org/jira/browse/FLINK-26605) is to solve 2 problems:
   1. Check if JobManager is really ready for serving when the state is READY.
   2. Solve the case that JobManager crashed backoff.
   I agree that #56 has already solved above problems.
   
   This PR is trying to solve above 2 problems in a somehow different way and leads to confusion. As Gyula's said, this PR should swicth the target to improve the session case. In fact, the session observer will keep switching between READY and DEPLOYED_NOT_READY due to logic in main branch which should be a bug I believe.
   
   So I would like to clarify the goal of this PR will include after above discussion:
   1. Refine the control flow in the `observeJmDeployment` based on current main branch to improve logic of moving from DEPLOYED_NOT_READY to READY to fix the bug of session observer.
   2. Rename `operator.observer.rest-ready.delay.sec` to `operator.observer.flink.client.timeout.sec` to control the waiting time of `listJobs`. IMO, it can be used as previous config for waiting the rest api to be fine as well.
   3. Modify the Dockerfile to output more meaningful message in the CI.
   
   What I would do to rollback extra changes:
   1. Remove `isJobManagerServing` method. Instead, I will keep using `isJobManagerPortReady` as well to detect if JM is ready to work in Session case. The reason is that the timeout solution is expensive and IMO, as I said in [FLINK-26655](https://issues.apache.org/jira/browse/FLINK-26655), `isJobManagerServing` and `isJobManagerPortReady` should do the same thing(i.e. check if JM can work correctly) actually. If we find a good approach for checking the readiness of JM later, we can update it then.
   
   I will finish above changes today. Please let me know if there are unwanted changes or I ignore some cases.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829105801



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Thanks a lot for the detailed explanation! @gyfora 
   1. After checking [FLINK-26473](https://issues.apache.org/jira/browse/FLINK-26473) and again, I agree with cancelling the `return` to make it fall back to the deployment. When the rest service is unavailabe, it is reasonable to check the deployment to find if anything bad happens. I will update the logic here.
   
   2. Adding configurable timeout as @tweise and you suggest makes sense to me. As we achieve consensus on setting the same config for both methods, reusing the same logic is absolutely better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829482619



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       It might be better to leave this logic in the observer. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829573671



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;
+        } catch (Exception ignored) {
+        }

Review comment:
       I will try to cover the listJobs error scenario in a separate PR soon.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829665883



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072230607


   @bgeng777 I agree with a simplified approach here, one comment is to please check https://github.com/apache/flink-kubernetes-operator/pull/80/files and use a simialr approach for the new config (no need for `.sec` if we use a duration type :))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829081003



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }
+
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            // check if the JM is ready for accepting job submission
+            if (flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            } else {
+                logger.info("The job manager has not been ready for serving yet...");
+            }

Review comment:
       Also we should use the same **configurable** timeout in both `flinkservice.listJobs` and `flinkservice. isJobManagerServing ` (probably the isJobManagerServing could call listJobs internally) because if the hardcoded 10 seconds fail for any reason the whole logic breaks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072052494






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072046745


   @bgeng777 I think we might need to step back and figure out what we are trying to fix in this PR. After #56, the original problems described in the FLINK-26605 might not be true.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1067697253


   I think we do not need `OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC` now. Right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r826826536



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       Does this have another way to check the JobManager is serving? The exception way doesn't look good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829664876



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,9 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (flinkService.isJobManagerServing(effectiveConfig)) {

Review comment:
       For JobObserver, I do agree, it is somehow redundant when deployment is ready. For Job, we first `observeJmDeployment`(1 call of `listJobs()`) and then `observeFlinkJobStatus()`(1 or 2 call of `listJobs()`). We could reduce at least 1 extra rest call if we refactor some codes.
   But I think we may still need this check here for SessionObserver. For Session, we will only `observeJmDeployment`. Without this check, we will fall into the deployment check always and in current logic of `main` branch, we would change the JobManagerDeploymentStatus to `DEPLOYED_NOT_READY` as well which is not desired.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r827623281



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -129,6 +129,16 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return true;

Review comment:
       The `getWebInterfaceURL` might not work in the current situation because we are using `StandaloneClientHAServices` when creating the rest cluster client.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829012183



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -62,8 +62,22 @@ protected void observeJmDeployment(
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            if (!flinkService.isJobManagerServing(effectiveConfig)) {
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                logger.warn("The job manager is currently not ready for serving.");
+            }
+            return;
+        }

Review comment:
       +1 for adding a test to cover this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829264434



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       I think we need a new config in the OperatorConfiguraiton for this. We could call it `flinkClientTimeoutSeconds` -> `flink.client.timeout.seconds` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#discussion_r829264434



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -136,9 +136,20 @@ public boolean isJobManagerPortReady(Configuration config) {
         return false;
     }
 
+    public boolean isJobManagerServing(Configuration config) {
+        try {
+            this.listJobs(config);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(operatorConfiguration.getProgressCheckIntervalSeconds(), TimeUnit.SECONDS);

Review comment:
       I think we need a new config in the OperatorConfiguraiton for this. We could call it `flinkClientTimeoutSeconds` -> `flink.client.timeout.sec` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 edited a comment on pull request #62: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

Posted by GitBox <gi...@apache.org>.
bgeng777 edited a comment on pull request #62:
URL: https://github.com/apache/flink-kubernetes-operator/pull/62#issuecomment-1072102050


   Hi @wangyang0918 @gyfora, thanks a lot for your reply.
   It's my bad to not pay enough attention to @tweise's recent [PR](https://github.com/apache/flink-kubernetes-operator/pull/56) for [FLINK-26473](https://issues.apache.org/jira/browse/FLINK-26473) as these 2 PR were developed at the same time. As Yang said, the goal of [FLINK-26605](https://issues.apache.org/jira/browse/FLINK-26605) is to solve 2 problems:
   1. Check if JobManager is really ready for serving when the state is READY.
   2. Solve the case that JobManager crashed backoff.
   I agree that #56 has already solved above problems.
   
   This PR is trying to solve above 2 problems in a somehow different way and leads to confusion. As Gyula's said, this PR should swicth the target to improve the session case. In fact, the session observer will keep switching between READY and DEPLOYED_NOT_READY due to logic in main branch which should be a bug I believe.
   
   So I would like to clarify the goal of this PR will include after above discussion:
   1. Refine the control flow in the `observeJmDeployment` based on current main branch to improve logic of moving from DEPLOYED_NOT_READY to READY to fix the bug of session observer.
   2. Introduce a new config `operator.observer.flink.client.timeout.sec` to control the waiting time of `listJobs`. 
   3. Modify the Dockerfile to output more meaningful message in the CI.
   
   What I would do to rollback extra changes:
   1. Remove `isJobManagerServing` method. Instead, I will keep using `isJobManagerPortReady` as well to detect if JM is ready to work in Session case. The reason is that the timeout solution for a rest call is expensive.
   2. Keep `operator.observer.rest-ready.delay.sec` as it is to follow our current logic of waiting some seconds and set the JM status to READY.
   
   I will finish above changes today. Please let me know if there are unwanted changes or I ignore some cases.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org