You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 07:11:20 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #164: [FLINK-26871] Handle session job spec change

Aitozi commented on code in PR #164:
URL: https://github.com/apache/flink-kubernetes-operator/pull/164#discussion_r847009678


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -17,13 +17,175 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
 
 /** The observer of {@link FlinkSessionJob}. */
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkService flinkService;
+    private final Configuration defaultConfig;
+
+    public SessionJobObserver(
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkService flinkService,
+            Configuration defaultConfig) {
+        this.operatorConfiguration = operatorConfiguration;
+        this.flinkService = flinkService;
+        this.defaultConfig = defaultConfig;
+    }
+
     @Override
-    public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
+    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
+        var lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            return;
+        }
+
+        var flinkDepOpt =
+                OperatorUtils.getSecondaryResource(flinkSessionJob, context, operatorConfiguration);
+
+        var helper = new SessionJobHelper(flinkSessionJob, LOG);
+
+        if (!helper.sessionClusterReady(flinkDepOpt)) {
+            return;
+        }
+
+        Configuration lastValidatedConfig =
+                FlinkUtils.getEffectiveConfig(flinkDepOpt.get(), this.defaultConfig);
+        var jobFound = observeFlinkJobStatus(flinkSessionJob, lastValidatedConfig);
+        if (jobFound) {
+            observeSavepointStatus(flinkSessionJob, lastValidatedConfig);
+        }
+    }
+
+    /**
+     * Observe the status of the flink job.
+     *
+     * @param sessionJob the target session job.
+     * @param lastValidatedConfig The last validated config.
+     * @return If job found return true, otherwise return false.
+     */
+    private boolean observeFlinkJobStatus(
+            FlinkSessionJob sessionJob, Configuration lastValidatedConfig) {
+        var jobId = sessionJob.getStatus().getJobStatus().getJobId();
+        var previousJobStatus = sessionJob.getStatus().getJobStatus().getState();
+        try {
+            var jobs = flinkService.listJobs(lastValidatedConfig);
+            var clusterJobStatuses =
+                    jobs.stream()
+                            .filter(job -> job.getJobId().toHexString().equals(jobId))
+                            .collect(Collectors.toList());
+            if (!clusterJobStatuses.isEmpty()) {
+                String targetJobStatus =
+                        updateJobStatus(sessionJob.getStatus().getJobStatus(), clusterJobStatuses);
+                if (targetJobStatus.equals(previousJobStatus)) {
+                    LOG.info("Job status ({}) unchanged", previousJobStatus);
+                } else {
+                    LOG.info(
+                            "Job status successfully updated from {} to {}",
+                            previousJobStatus,
+                            targetJobStatus);
+                }
+                return true;
+            } else {
+                LOG.info("No job found for JobID: {}", jobId);
+                sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            }
+
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            return false;
+        }
+        return true;
+    }
+
+    private void observeSavepointStatus(

Review Comment:
   Yes, I have the same feeling, I will try to improve this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org