You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/07 10:11:15 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request, #260: [FLINK-27497] Track terminal job states in the observer

Aitozi opened a new pull request, #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260

   This PR improve the logic in `JobStatusObserver` and generate an event after each job state changed. 
   
   A manually test case for a batch job
   
   ![image](https://user-images.githubusercontent.com/9486140/172355164-2311a471-61ec-4387-b98e-3a7efe6e81b2.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893237588


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;
     }
 
     /**
      * Observe the status of the flink job.
      *
-     * @param jobStatus The job status to be observed.
+     * @param resource The custom resource to be observed.
      * @param deployedConfig Deployed job config.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(
+            AbstractFlinkResource<SPEC, STATUS> resource, Configuration deployedConfig, CTX ctx) {

Review Comment:
   thanks, looks more simple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893219001


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;

Review Comment:
   hah, thanks for remind. I did't notice that just now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893233823


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;
     }
 
     /**
      * Observe the status of the flink job.
      *
-     * @param jobStatus The job status to be observed.
+     * @param resource The custom resource to be observed.
      * @param deployedConfig Deployed job config.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(
+            AbstractFlinkResource<SPEC, STATUS> resource, Configuration deployedConfig, CTX ctx) {

Review Comment:
   sorry I messed up my comment, what I was trying to say is that we can simply use wildcard parameters:
   ```
   AbstractFlinkResource<?, ?>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893121276


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +91,64 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param status the target job status
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private void updateJobStatus(
+            CommonStatus<SPEC> status,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        var jobStatus = status.getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);
+        } else {
+            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+            LOG.info(
+                    "Job status successfully updated from {} to {}",
+                    previousJobStatus,
+                    jobStatus.getState());
+        }
+
+        if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
+            try {
+                var result =
+                        flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
+                result.getSerializedThrowable()
+                        .ifPresent(
+                                t -> {
+                                    var error = t.getFullStringifiedStackTrace();
+                                    if (error != null && !error.equals(status.getError())) {

Review Comment:
   This is meant to avoid the duplicated log in operator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#issuecomment-1150886408

   I have optimized the method, please take a look again. @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893116996


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +91,64 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param status the target job status
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private void updateJobStatus(
+            CommonStatus<SPEC> status,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        var jobStatus = status.getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);
+        } else {
+            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+            LOG.info(
+                    "Job status successfully updated from {} to {}",
+                    previousJobStatus,
+                    jobStatus.getState());
+        }
+
+        if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
+            try {
+                var result =
+                        flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
+                result.getSerializedThrowable()
+                        .ifPresent(
+                                t -> {
+                                    var error = t.getFullStringifiedStackTrace();
+                                    if (error != null && !error.equals(status.getError())) {

Review Comment:
   `!error.equals(status.getError())` check not necessary



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -82,11 +79,14 @@ protected boolean observeFlinkCluster(
 
         var jobStatus = flinkApp.getStatus().getJobStatus();
 
+        var previousJobState = jobStatus.getState();
         boolean jobFound =
                 jobStatusObserver.observe(
-                        jobStatus,
+                        flinkApp.getStatus(),
                         deployedConfig,
                         new ApplicationObserverContext(flinkApp, context, deployedConfig));
+        EventUtils.generateEventsOnJobStatusChanged(
+                kubernetesClient, previousJobState, jobStatus, flinkApp);

Review Comment:
   I think this method should be called in the JobStatusObserver directly to avoid having it in multiple places. There you already have previousJobState etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893235051


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +95,84 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param resource the target custom resource.
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private <SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
+            void updateJobStatus(
+                    AbstractFlinkResource<SPEC, STATUS> resource,
+                    JobStatusMessage clusterJobStatus,
+                    Configuration deployedConfig) {
+        var jobStatus = resource.getStatus().getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);

Review Comment:
   Should we return after this line? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893133032


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -82,11 +79,14 @@ protected boolean observeFlinkCluster(
 
         var jobStatus = flinkApp.getStatus().getJobStatus();
 
+        var previousJobState = jobStatus.getState();
         boolean jobFound =
                 jobStatusObserver.observe(
-                        jobStatus,
+                        flinkApp.getStatus(),
                         deployedConfig,
                         new ApplicationObserverContext(flinkApp, context, deployedConfig));
+        EventUtils.generateEventsOnJobStatusChanged(
+                kubernetesClient, previousJobState, jobStatus, flinkApp);

Review Comment:
   I have tried that way, but it relies on the resource object, It will have to touch some more class to pass it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893209434


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -82,11 +79,14 @@ protected boolean observeFlinkCluster(
 
         var jobStatus = flinkApp.getStatus().getJobStatus();
 
+        var previousJobState = jobStatus.getState();
         boolean jobFound =
                 jobStatusObserver.observe(
-                        jobStatus,
+                        flinkApp.getStatus(),
                         deployedConfig,
                         new ApplicationObserverContext(flinkApp, context, deployedConfig));
+        EventUtils.generateEventsOnJobStatusChanged(
+                kubernetesClient, previousJobState, jobStatus, flinkApp);

Review Comment:
   Good suggestion, I have pushed the commit, PTAL again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893213166


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;

Review Comment:
   Sorry to annoy you but the `FlinkService` currently has a `getKubernetesClient` method that we could use to avoid too many changes  :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#issuecomment-1150505605

   I am not sure which error info will be more suitable (the full stack vs the simplified message as below ) . 
   
   ```
   error: 'org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy'
   ```
   
   I prefer to show the full stack here, which will be more explicit . The drawback is will pollute the resource object


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893215127


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;
     }
 
     /**
      * Observe the status of the flink job.
      *
-     * @param jobStatus The job status to be observed.
+     * @param resource The custom resource to be observed.
      * @param deployedConfig Deployed job config.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(
+            AbstractFlinkResource<SPEC, STATUS> resource, Configuration deployedConfig, CTX ctx) {

Review Comment:
   Do we need the new generic parameters? I think AbstractFlinkResource<?, ?> would be enough and we also dont need to change the class signature that way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#issuecomment-1150818326

   > Added 1-2 minor comments but looks good :)
   > 
   > Regarding the error, I would be in favor of shorter error messages in the status. If the job is FAILED the user can always check the logs and need to investigate anyways as it's a terminal state.
   > 
   > Even if we decide for the full stack trace we need to trim it to a max size to be safe.
   
   I keep the stack trace with a limited size of 0.5k to give the as much as error info in status


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893230346


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;
     }
 
     /**
      * Observe the status of the flink job.
      *
-     * @param jobStatus The job status to be observed.
+     * @param resource The custom resource to be observed.
      * @param deployedConfig Deployed job config.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(
+            AbstractFlinkResource<SPEC, STATUS> resource, Configuration deployedConfig, CTX ctx) {

Review Comment:
   Replaced with generic parameters



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -31,23 +36,29 @@
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<
+        SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final KubernetesClient kubernetesClient;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, KubernetesClient client) {
         this.flinkService = flinkService;
+        this.kubernetesClient = client;

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893132426


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +91,64 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param status the target job status
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private void updateJobStatus(
+            CommonStatus<SPEC> status,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        var jobStatus = status.getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);
+        } else {
+            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+            LOG.info(
+                    "Job status successfully updated from {} to {}",
+                    previousJobStatus,
+                    jobStatus.getState());
+        }
+
+        if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
+            try {
+                var result =
+                        flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
+                result.getSerializedThrowable()
+                        .ifPresent(
+                                t -> {
+                                    var error = t.getFullStringifiedStackTrace();
+                                    if (error != null && !error.equals(status.getError())) {

Review Comment:
   oh I see, sry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893279190


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +95,84 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param resource the target custom resource.
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private <SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
+            void updateJobStatus(
+                    AbstractFlinkResource<SPEC, STATUS> resource,
+                    JobStatusMessage clusterJobStatus,
+                    Configuration deployedConfig) {
+        var jobStatus = resource.getStatus().getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#issuecomment-1149999461

   cc @gyfora @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#issuecomment-1149982267

   Example of a failed flinkapp
   
   ```
   Status:
     Cluster Info:
       Flink - Revision:  3a4c113 @ 2022-04-20T19:50:32+02:00
       Flink - Version:   1.15.0
     Error:               org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
                          at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
                          at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
                          at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
                          at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
                          at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
                          at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
                          at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
                          at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
                          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
                          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
                          at java.base/java.lang.reflect.Method.invoke(Unknown Source)
                          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
                          at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
                          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
                          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
                          at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
                          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
                          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
                          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
                          at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
                          at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
                          at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
                          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
                          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
                          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
                          at akka.actor.Actor.aroundReceive(Actor.scala:537)
                          at akka.actor.Actor.aroundReceive$(Actor.scala:535)
                          at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
                          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
                          at akka.actor.ActorCell.invoke(ActorCell.scala:548)
                          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
                          at akka.dispatch.Mailbox.run(Mailbox.scala:231)
                          at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
                          at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
                          at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
                          at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
                          at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
                          at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
   Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to decode the word
     at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:187)
     at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:176)
     at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
     at java.base/java.lang.Thread.run(Unknown Source)
   
     Job Manager Deployment Status:  READY
     Job Status:
       Job Id:    00000000000000000000000000000000
       Job Name:  WordCount
       Savepoint Info:
         Last Periodic Savepoint Timestamp:  0
         Savepoint History:
         Trigger Id:
         Trigger Timestamp:  0
         Trigger Type:       UNKNOWN
       Start Time:           1654697747801
       State:                FAILED
       Update Time:          1654697833534
     Reconciliation Status:
       Last Reconciled Spec:      {"job":{"jarURI":"local:///opt/flink/usrlib/myjob.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"savepoint","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"},{"mountPath":"/opt/flink/usrlib","name":"flink-artifact"}]}],"initContainers":[{"command":["wget","http://blink-alipay.oss-c
 n-hangzhou-zmf.aliyuncs.com/yuli/flink-examples-streaming_2.12-1.16-SNAPSHOT-WordCount.jar","-O","/flink-artifact/myjob.jar"],"image":"busybox:latest","imagePullPolicy":"IfNotPresent","name":"artifacts-fetcher","volumeMounts":[{"mountPath":"/flink-artifact","name":"flink-artifact"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"},{"emptyDir":{},"name":"flink-artifact"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"podTemplate":null},"logConfiguration":null,"apiVersion":"v1beta1"}
       Last Stable Spec:          {"job":{"jarURI":"local:///opt/flink/usrlib/myjob.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"savepoint","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"},{"mountPath":"/opt/flink/usrlib","name":"flink-artifact"}]}],"initContainers":[{"command":["wget","http://blink-alipay.oss-c
 n-hangzhou-zmf.aliyuncs.com/yuli/flink-examples-streaming_2.12-1.16-SNAPSHOT-WordCount.jar","-O","/flink-artifact/myjob.jar"],"image":"busybox:latest","imagePullPolicy":"IfNotPresent","name":"artifacts-fetcher","volumeMounts":[{"mountPath":"/flink-artifact","name":"flink-artifact"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"},{"emptyDir":{},"name":"flink-artifact"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"podTemplate":null},"logConfiguration":null,"apiVersion":"v1beta1"}
       Reconciliation Timestamp:  1654697738529
       State:                     DEPLOYED
   Events:
     Type    Reason          Age    From                  Message
     ----    ------          ----   ----                  -------
     Normal  Status Updated  2m13s  JobManagerDeployment  Job status updated from RECONCILING to RUNNING
     Normal  Status Updated  57s    JobManagerDeployment  Job status updated from RUNNING to FAILED
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #260: [FLINK-27497] Track terminal job states in the observer

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #260:
URL: https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893167124


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -82,11 +79,14 @@ protected boolean observeFlinkCluster(
 
         var jobStatus = flinkApp.getStatus().getJobStatus();
 
+        var previousJobState = jobStatus.getState();
         boolean jobFound =
                 jobStatusObserver.observe(
-                        jobStatus,
+                        flinkApp.getStatus(),
                         deployedConfig,
                         new ApplicationObserverContext(flinkApp, context, deployedConfig));
+        EventUtils.generateEventsOnJobStatusChanged(
+                kubernetesClient, previousJobState, jobStatus, flinkApp);

Review Comment:
   Wouldn't it be enough to simply change the jobstatus observer to get the full resource object instead of the status only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org