You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 07:08:35 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #164: [FLINK-26871] Handle session job spec change

gyfora commented on code in PR #164:
URL: https://github.com/apache/flink-kubernetes-operator/pull/164#discussion_r847005748


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -17,13 +17,175 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
 
 /** The observer of {@link FlinkSessionJob}. */
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkService flinkService;
+    private final Configuration defaultConfig;
+
+    public SessionJobObserver(
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkService flinkService,
+            Configuration defaultConfig) {
+        this.operatorConfiguration = operatorConfiguration;
+        this.flinkService = flinkService;
+        this.defaultConfig = defaultConfig;
+    }
+
     @Override
-    public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
+    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
+        var lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            return;
+        }
+
+        var flinkDepOpt =
+                OperatorUtils.getSecondaryResource(flinkSessionJob, context, operatorConfiguration);
+
+        var helper = new SessionJobHelper(flinkSessionJob, LOG);
+
+        if (!helper.sessionClusterReady(flinkDepOpt)) {
+            return;
+        }
+
+        Configuration lastValidatedConfig =
+                FlinkUtils.getEffectiveConfig(flinkDepOpt.get(), this.defaultConfig);
+        var jobFound = observeFlinkJobStatus(flinkSessionJob, lastValidatedConfig);
+        if (jobFound) {
+            observeSavepointStatus(flinkSessionJob, lastValidatedConfig);
+        }
+    }
+
+    /**
+     * Observe the status of the flink job.
+     *
+     * @param sessionJob the target session job.
+     * @param lastValidatedConfig The last validated config.
+     * @return If job found return true, otherwise return false.
+     */
+    private boolean observeFlinkJobStatus(
+            FlinkSessionJob sessionJob, Configuration lastValidatedConfig) {
+        var jobId = sessionJob.getStatus().getJobStatus().getJobId();
+        var previousJobStatus = sessionJob.getStatus().getJobStatus().getState();
+        try {
+            var jobs = flinkService.listJobs(lastValidatedConfig);
+            var clusterJobStatuses =
+                    jobs.stream()
+                            .filter(job -> job.getJobId().toHexString().equals(jobId))
+                            .collect(Collectors.toList());
+            if (!clusterJobStatuses.isEmpty()) {
+                String targetJobStatus =
+                        updateJobStatus(sessionJob.getStatus().getJobStatus(), clusterJobStatuses);
+                if (targetJobStatus.equals(previousJobStatus)) {
+                    LOG.info("Job status ({}) unchanged", previousJobStatus);
+                } else {
+                    LOG.info(
+                            "Job status successfully updated from {} to {}",
+                            previousJobStatus,
+                            targetJobStatus);
+                }
+                return true;
+            } else {
+                LOG.info("No job found for JobID: {}", jobId);
+                sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            }

Review Comment:
   should we return false here?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -17,13 +17,175 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
 
 /** The observer of {@link FlinkSessionJob}. */
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkService flinkService;
+    private final Configuration defaultConfig;
+
+    public SessionJobObserver(
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkService flinkService,
+            Configuration defaultConfig) {
+        this.operatorConfiguration = operatorConfiguration;
+        this.flinkService = flinkService;
+        this.defaultConfig = defaultConfig;
+    }
+
     @Override
-    public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
+    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
+        var lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            return;
+        }
+
+        var flinkDepOpt =
+                OperatorUtils.getSecondaryResource(flinkSessionJob, context, operatorConfiguration);
+
+        var helper = new SessionJobHelper(flinkSessionJob, LOG);
+
+        if (!helper.sessionClusterReady(flinkDepOpt)) {
+            return;
+        }
+
+        Configuration lastValidatedConfig =
+                FlinkUtils.getEffectiveConfig(flinkDepOpt.get(), this.defaultConfig);
+        var jobFound = observeFlinkJobStatus(flinkSessionJob, lastValidatedConfig);
+        if (jobFound) {
+            observeSavepointStatus(flinkSessionJob, lastValidatedConfig);
+        }
+    }
+
+    /**
+     * Observe the status of the flink job.
+     *
+     * @param sessionJob the target session job.
+     * @param lastValidatedConfig The last validated config.
+     * @return If job found return true, otherwise return false.
+     */
+    private boolean observeFlinkJobStatus(
+            FlinkSessionJob sessionJob, Configuration lastValidatedConfig) {
+        var jobId = sessionJob.getStatus().getJobStatus().getJobId();
+        var previousJobStatus = sessionJob.getStatus().getJobStatus().getState();
+        try {
+            var jobs = flinkService.listJobs(lastValidatedConfig);
+            var clusterJobStatuses =
+                    jobs.stream()
+                            .filter(job -> job.getJobId().toHexString().equals(jobId))
+                            .collect(Collectors.toList());
+            if (!clusterJobStatuses.isEmpty()) {
+                String targetJobStatus =
+                        updateJobStatus(sessionJob.getStatus().getJobStatus(), clusterJobStatuses);
+                if (targetJobStatus.equals(previousJobStatus)) {
+                    LOG.info("Job status ({}) unchanged", previousJobStatus);
+                } else {
+                    LOG.info(
+                            "Job status successfully updated from {} to {}",
+                            previousJobStatus,
+                            targetJobStatus);
+                }
+                return true;
+            } else {
+                LOG.info("No job found for JobID: {}", jobId);
+                sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            }
+
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            return false;
+        }
+        return true;
+    }
+
+    private void observeSavepointStatus(
+            FlinkSessionJob flinkSessionJob, Configuration lastValidatedConfig) {
+        SavepointInfo savepointInfo = flinkSessionJob.getStatus().getJobStatus().getSavepointInfo();
+        SessionJobHelper helper = new SessionJobHelper(flinkSessionJob, LOG);
+        if (!helper.savepointInProgress()) {
+            LOG.debug("Savepoint not in progress");
+            return;
+        }
+        LOG.info("Observing savepoint status");
+        SavepointFetchResult savepointFetchResult;
+        try {
+            savepointFetchResult =
+                    flinkService.fetchSavepointInfo(
+                            flinkSessionJob
+                                    .getStatus()
+                                    .getJobStatus()
+                                    .getSavepointInfo()
+                                    .getTriggerId(),
+                            flinkSessionJob.getStatus().getJobStatus().getJobId(),
+                            lastValidatedConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while fetching savepoint info", e);
+            return;
+        }
+
+        if (!savepointFetchResult.isTriggered()) {
+            String error = savepointFetchResult.getError();
+            if (error != null
+                    || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) {
+                String errorMsg = error != null ? error : "Savepoint status unknown";
+                LOG.error(errorMsg);
+                savepointInfo.resetTrigger();
+                ReconciliationUtils.updateForReconciliationError(flinkSessionJob, errorMsg);
+                return;
+            }
+            LOG.info("Savepoint operation not running, waiting within grace period...");
+        }
+        if (savepointFetchResult.getSavepoint() == null) {
+            LOG.info("Savepoint is still in progress...");
+            return;
+        }
+        LOG.info("Savepoint status updated with latest completed savepoint info");
+        savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
+    }
+
+    /**
+     * Update previous job status based on the job list from the cluster and return the target
+     * status.
+     */
+    private String updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {

Review Comment:
   this also looks basically duplicated from the ApplicationObserver



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -17,13 +17,175 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
 
 /** The observer of {@link FlinkSessionJob}. */
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkService flinkService;
+    private final Configuration defaultConfig;
+
+    public SessionJobObserver(
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkService flinkService,
+            Configuration defaultConfig) {
+        this.operatorConfiguration = operatorConfiguration;
+        this.flinkService = flinkService;
+        this.defaultConfig = defaultConfig;
+    }
+
     @Override
-    public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
+    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
+        var lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            return;
+        }
+
+        var flinkDepOpt =
+                OperatorUtils.getSecondaryResource(flinkSessionJob, context, operatorConfiguration);
+
+        var helper = new SessionJobHelper(flinkSessionJob, LOG);
+
+        if (!helper.sessionClusterReady(flinkDepOpt)) {
+            return;
+        }
+
+        Configuration lastValidatedConfig =
+                FlinkUtils.getEffectiveConfig(flinkDepOpt.get(), this.defaultConfig);
+        var jobFound = observeFlinkJobStatus(flinkSessionJob, lastValidatedConfig);
+        if (jobFound) {
+            observeSavepointStatus(flinkSessionJob, lastValidatedConfig);
+        }
+    }
+
+    /**
+     * Observe the status of the flink job.
+     *
+     * @param sessionJob the target session job.
+     * @param lastValidatedConfig The last validated config.
+     * @return If job found return true, otherwise return false.
+     */
+    private boolean observeFlinkJobStatus(
+            FlinkSessionJob sessionJob, Configuration lastValidatedConfig) {
+        var jobId = sessionJob.getStatus().getJobStatus().getJobId();
+        var previousJobStatus = sessionJob.getStatus().getJobStatus().getState();
+        try {
+            var jobs = flinkService.listJobs(lastValidatedConfig);
+            var clusterJobStatuses =
+                    jobs.stream()
+                            .filter(job -> job.getJobId().toHexString().equals(jobId))
+                            .collect(Collectors.toList());
+            if (!clusterJobStatuses.isEmpty()) {
+                String targetJobStatus =
+                        updateJobStatus(sessionJob.getStatus().getJobStatus(), clusterJobStatuses);
+                if (targetJobStatus.equals(previousJobStatus)) {
+                    LOG.info("Job status ({}) unchanged", previousJobStatus);
+                } else {
+                    LOG.info(
+                            "Job status successfully updated from {} to {}",
+                            previousJobStatus,
+                            targetJobStatus);
+                }
+                return true;
+            } else {
+                LOG.info("No job found for JobID: {}", jobId);
+                sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            }
+
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            sessionJob.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+            return false;
+        }
+        return true;
+    }
+
+    private void observeSavepointStatus(

Review Comment:
   Seems like there is a lot of code duplication here with the ApplicationObserver, I wonder if we can factor this out somehow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org