You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/09 10:51:49 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27497] Track terminal job states in the observer
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 18fdd7e [FLINK-27497] Track terminal job states in the observer
18fdd7e is described below
commit 18fdd7e8cd870bac7763a79d0af99c64d4e181a7
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Tue Jun 7 16:36:36 2022 +0800
[FLINK-27497] Track terminal job states in the observer
---
.../operator/observer/JobStatusObserver.java | 114 +++++++++++++++---
.../observer/context/VoidObserverContext.java | 2 +-
.../deployment/AbstractDeploymentObserver.java | 1 +
.../observer/deployment/ApplicationObserver.java | 24 ++--
.../observer/sessionjob/SessionJobObserver.java | 23 ++--
.../kubernetes/operator/service/FlinkService.java | 14 +++
.../kubernetes/operator/utils/EventUtils.java | 3 +-
.../kubernetes/operator/TestingClusterClient.java | 12 +-
.../kubernetes/operator/TestingFlinkService.java | 29 +++++
.../controller/DeploymentRecoveryTest.java | 2 +-
.../controller/FlinkDeploymentControllerTest.java | 2 +-
.../operator/controller/RollbackTest.java | 2 +-
.../deployment/ApplicationObserverTest.java | 134 +++++++++++++++------
.../sessionjob/SessionJobObserverTest.java | 4 +-
14 files changed, 273 insertions(+), 93 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 30df728..afc9582 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -18,8 +18,10 @@
package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.slf4j.Logger;
@@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException;
public abstract class JobStatusObserver<CTX> {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+ private static final int MAX_ERROR_STRING_LENGTH = 512;
private final FlinkService flinkService;
public JobStatusObserver(FlinkService flinkService) {
@@ -43,11 +46,13 @@ public abstract class JobStatusObserver<CTX> {
/**
* Observe the status of the flink job.
*
- * @param jobStatus The job status to be observed.
+ * @param resource The custom resource to be observed.
* @param deployedConfig Deployed job config.
* @return If job found return true, otherwise return false.
*/
- public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+ public boolean observe(
+ AbstractFlinkResource<?, ?> resource, Configuration deployedConfig, CTX ctx) {
+ var jobStatus = resource.getStatus().getJobStatus();
LOG.info("Observing job status");
var previousJobStatus = jobStatus.getState();
List<JobStatusMessage> clusterJobStatuses;
@@ -63,19 +68,13 @@ public abstract class JobStatusObserver<CTX> {
}
if (!clusterJobStatuses.isEmpty()) {
- Optional<String> targetJobStatus = updateJobStatus(jobStatus, clusterJobStatuses);
- if (targetJobStatus.isEmpty()) {
+ Optional<JobStatusMessage> targetJobStatusMessage =
+ filterTargetJob(jobStatus, clusterJobStatuses);
+ if (targetJobStatusMessage.isEmpty()) {
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
return false;
} else {
- if (targetJobStatus.get().equals(previousJobStatus)) {
- LOG.info("Job status ({}) unchanged", previousJobStatus);
- } else {
- LOG.info(
- "Job status successfully updated from {} to {}",
- previousJobStatus,
- targetJobStatus.get());
- }
+ updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig);
}
return true;
} else {
@@ -94,14 +93,95 @@ public abstract class JobStatusObserver<CTX> {
protected abstract void onTimeout(CTX ctx);
/**
- * Find and update previous job status based on the job list from the cluster and return the
- * target status.
+ * Filter the target job status message by the job list from the cluster.
*
- * @param status the target job status to be updated.
+ * @param status the target job status.
* @param clusterJobStatuses the candidate cluster jobs.
- * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+ * @return The target job status message. If no matched job found, {@code Optional.empty()} will
* be returned.
*/
- protected abstract Optional<String> updateJobStatus(
+ protected abstract Optional<JobStatusMessage> filterTargetJob(
JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+ /**
+ * Update the status in CR according to the cluster job status.
+ *
+ * @param resource the target custom resource.
+ * @param clusterJobStatus the status fetch from the cluster.
+ * @param deployedConfig Deployed job config.
+ */
+ private void updateJobStatus(
+ AbstractFlinkResource<?, ?> resource,
+ JobStatusMessage clusterJobStatus,
+ Configuration deployedConfig) {
+ var jobStatus = resource.getStatus().getJobStatus();
+ var previousJobStatus = jobStatus.getState();
+
+ jobStatus.setState(clusterJobStatus.getJobState().name());
+ jobStatus.setJobName(clusterJobStatus.getJobName());
+ jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+ jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+ if (jobStatus.getState().equals(previousJobStatus)) {
+ LOG.info("Job status ({}) unchanged", previousJobStatus);
+ } else {
+ jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+ var message =
+ previousJobStatus == null
+ ? String.format("Job status changed to %s", jobStatus.getState())
+ : String.format(
+ "Job status changed from %s to %s",
+ previousJobStatus, jobStatus.getState());
+ LOG.info(message);
+
+ setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
+ EventUtils.createOrUpdateEvent(
+ flinkService.getKubernetesClient(),
+ resource,
+ EventUtils.Type.Normal,
+ "Status Changed",
+ message,
+ EventUtils.Component.Job);
+ }
+ }
+
+ private void setErrorIfPresent(
+ AbstractFlinkResource<?, ?> resource,
+ JobStatusMessage clusterJobStatus,
+ Configuration deployedConfig) {
+ if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
+ try {
+ var result =
+ flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
+ result.getSerializedThrowable()
+ .ifPresent(
+ t -> {
+ var error = t.getFullStringifiedStackTrace();
+ var trimmedError = getErrorWithMaxLength(error);
+ trimmedError.ifPresent(
+ value -> {
+ if (!value.equals(
+ resource.getStatus().getError())) {
+ resource.getStatus().setError(value);
+ LOG.error(
+ "Job {} failed with error: {}",
+ clusterJobStatus.getJobId(),
+ error);
+ }
+ });
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to request the job result", e);
+ }
+ }
+ }
+
+ private Optional<String> getErrorWithMaxLength(String error) {
+ if (error == null) {
+ return Optional.empty();
+ } else {
+ return Optional.of(
+ error.substring(0, Math.min(error.length(), MAX_ERROR_STRING_LENGTH)));
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
index 085638e..3ed3940 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
@@ -17,7 +17,7 @@
package org.apache.flink.kubernetes.operator.observer.context;
-/** A empty observer context. */
+/** An empty observer context. */
public class VoidObserverContext {
public static final VoidObserverContext INSTANCE = new VoidObserverContext();
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index c17647f..7711c4c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -217,6 +217,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
FlinkDeploymentStatus status = dep.getStatus();
ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
+ && !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
&& reconciliationStatus.isLastReconciledSpecStable()) {
status.setError(null);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index f74263b..dac96a5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -42,7 +42,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
/** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
public class ApplicationObserver extends AbstractDeploymentObserver {
- private final SavepointObserver savepointObserver;
+ private final SavepointObserver<FlinkDeploymentStatus> savepointObserver;
private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
public ApplicationObserver(
@@ -51,7 +51,7 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
FlinkConfigManager configManager,
StatusHelper<FlinkDeploymentStatus> statusHelper) {
super(kubernetesClient, flinkService, configManager);
- this.savepointObserver = new SavepointObserver(flinkService, configManager, statusHelper);
+ this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
this.jobStatusObserver =
new JobStatusObserver<>(flinkService) {
@Override
@@ -60,18 +60,12 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
}
@Override
- protected Optional<String> updateJobStatus(
+ protected Optional<JobStatusMessage> filterTargetJob(
JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
- clusterJobStatuses.sort(
- (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
- JobStatusMessage newJob = clusterJobStatuses.get(0);
-
- status.setState(newJob.getJobState().name());
- status.setJobName(newJob.getJobName());
- status.setJobId(newJob.getJobId().toHexString());
- status.setStartTime(String.valueOf(newJob.getStartTime()));
- status.setUpdateTime(String.valueOf(System.currentTimeMillis()));
- return Optional.of(status.getState());
+ if (!clusterJobStatuses.isEmpty()) {
+ return Optional.of(clusterJobStatuses.get(0));
+ }
+ return Optional.empty();
}
};
}
@@ -80,11 +74,9 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
protected boolean observeFlinkCluster(
FlinkDeployment flinkApp, Context context, Configuration deployedConfig) {
- var jobStatus = flinkApp.getStatus().getJobStatus();
-
boolean jobFound =
jobStatusObserver.observe(
- jobStatus,
+ flinkApp,
deployedConfig,
new ApplicationObserverContext(flinkApp, context, deployedConfig));
if (jobFound) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index a0f6c73..7d16a6a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -47,7 +47,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
- private final SavepointObserver savepointObserver;
+ private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
public SessionJobObserver(
@@ -56,14 +56,14 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
StatusHelper<FlinkSessionJobStatus> statusHelper) {
this.flinkService = flinkService;
this.configManager = configManager;
- this.savepointObserver = new SavepointObserver(flinkService, configManager, statusHelper);
+ this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
this.jobStatusObserver =
new JobStatusObserver<>(flinkService) {
@Override
- protected void onTimeout(VoidObserverContext voidObserverContext) {}
+ protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
@Override
- protected Optional<String> updateJobStatus(
+ protected Optional<JobStatusMessage> filterTargetJob(
JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
var jobId =
Preconditions.checkNotNull(
@@ -82,15 +82,9 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
if (matchedList.size() == 0) {
LOG.info("No job found for JobID: {}", jobId);
return Optional.empty();
+ } else {
+ return Optional.of(matchedList.get(0));
}
-
- JobStatusMessage newJob = matchedList.get(0);
-
- status.setState(newJob.getJobState().name());
- status.setJobName(newJob.getJobName());
- status.setStartTime(String.valueOf(newJob.getStartTime()));
- status.setUpdateTime(String.valueOf(System.currentTimeMillis()));
- return Optional.of(status.getState());
}
};
}
@@ -115,9 +109,8 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
var deployedConfig = configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob);
var jobFound =
jobStatusObserver.observe(
- flinkSessionJob.getStatus().getJobStatus(),
- deployedConfig,
- VoidObserverContext.INSTANCE);
+ flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
+
if (jobFound) {
savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c6731c0..2ba73e0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -53,6 +53,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
@@ -340,6 +341,19 @@ public class FlinkService {
}
}
+ public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
+ try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ return clusterClient
+ .requestJobResult(jobID)
+ .get(
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkClientTimeout()
+ .getSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
@VisibleForTesting
protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index aad4375..f710378 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -39,7 +39,8 @@ public class EventUtils {
/** The component of events. */
public enum Component {
Operator,
- JobManagerDeployment
+ JobManagerDeployment,
+ Job
}
public static String generateEventName(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index 730d99b..1175461 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -71,6 +71,11 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
throw new UnsupportedOperationException();
};
+ private Function<JobID, CompletableFuture<JobResult>> requestResultFunction =
+ jobID ->
+ CompletableFuture.completedFuture(
+ new JobResult.Builder().jobId(jobID).netRuntime(1).build());
+
private final Configuration configuration;
private final T clusterId;
@@ -109,6 +114,11 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
this.listJobsFunction = listJobsFunction;
}
+ public void setRequestResultFunction(
+ Function<JobID, CompletableFuture<JobResult>> requestResultFunction) {
+ this.requestResultFunction = requestResultFunction;
+ }
+
@Override
public T getClusterId() {
return clusterId;
@@ -151,7 +161,7 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
@Override
public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
- throw new UnsupportedOperationException();
+ return requestResultFunction.apply(jobId);
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index ac21037..c1b4e8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -42,8 +42,10 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
+import org.apache.flink.util.SerializedThrowable;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
@@ -82,6 +84,7 @@ public class TestingFlinkService extends FlinkService {
private int triggerCounter = 0;
private final List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
+ private final Map<JobID, String> jobErrors = new HashMap<>();
private final Map<JobID, SubmittedJobInfo> sessionJobs = new HashMap<>();
private final Set<String> sessions = new HashSet<>();
private boolean isPortReady = true;
@@ -299,6 +302,17 @@ public class TestingFlinkService extends FlinkService {
}
return CompletableFuture.completedFuture(Acknowledge.get());
});
+
+ clusterClient.setRequestResultFunction(
+ jobID -> {
+ var builder = new JobResult.Builder().jobId(jobID).netRuntime(1);
+ if (jobErrors.containsKey(jobID)) {
+ builder.serializedThrowable(
+ new SerializedThrowable(
+ new RuntimeException(jobErrors.get(jobID))));
+ }
+ return CompletableFuture.completedFuture(builder.build());
+ });
return clusterClient;
}
@@ -389,6 +403,21 @@ public class TestingFlinkService extends FlinkService {
this.podList = podList;
}
+ public void markApplicationJobFailedWithError(JobID jobID, String error) throws Exception {
+ var job = jobs.stream().filter(tuple -> tuple.f1.getJobId().equals(jobID)).findFirst();
+ if (!job.isPresent()) {
+ throw new Exception("The target job missed");
+ }
+ var oldStatus = job.get().f1;
+ job.get().f1 =
+ new JobStatusMessage(
+ oldStatus.getJobId(),
+ oldStatus.getJobName(),
+ JobStatus.FAILED,
+ oldStatus.getStartTime());
+ jobErrors.put(jobID, error);
+ }
+
/** The information collector of a submitted job. */
public static class SubmittedJobInfo {
public final String savepointPath;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 372aa75..a9d59fe 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -57,7 +57,7 @@ public class DeploymentRecoveryTest {
@BeforeEach
public void setup() {
- flinkService = new TestingFlinkService();
+ flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
TestUtils.createTestController(configManager, kubernetesClient, flinkService);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 8a76aa1..7b0c0c7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -78,7 +78,7 @@ public class FlinkDeploymentControllerTest {
@BeforeEach
public void setup() {
- flinkService = new TestingFlinkService();
+ flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
TestUtils.createTestController(configManager, kubernetesClient, flinkService);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 4774e3b..fa216e4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -64,7 +64,7 @@ public class RollbackTest {
@BeforeEach
public void setup() {
- flinkService = new TestingFlinkService();
+ flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
TestUtils.createTestController(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 104801c..53353db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.observer.deployment;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -37,10 +38,12 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,10 +60,10 @@ public class ApplicationObserverTest {
@Test
public void observeApplicationCluster() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
+ TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
ApplicationObserver observer =
new ApplicationObserver(
- null, flinkService, configManager, new TestingStatusHelper<>());
+ kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.setStatus(new FlinkDeploymentStatus());
@@ -162,12 +165,80 @@ public class ApplicationObserverTest {
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
}
+ @Test
+ public void testEventGeneratedWhenStatusChanged() throws Exception {
+ TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration conf =
+ configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
+ ApplicationObserver observer =
+ new ApplicationObserver(
+ kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
+ flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
+
+ deployment.setStatus(new FlinkDeploymentStatus());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+ observer.observe(deployment, readyContext);
+ var eventList =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ Assertions.assertEquals(1, eventList.size());
+ Assertions.assertEquals("Job status changed to RUNNING", eventList.get(0).getMessage());
+ observer.observe(deployment, readyContext);
+ Assertions.assertEquals(
+ 1,
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems()
+ .size());
+ }
+
+ @Test
+ public void testErrorForwardToStatusWhenJobFailed() throws Exception {
+ TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration conf =
+ configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
+ ApplicationObserver observer =
+ new ApplicationObserver(
+ kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
+ flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
+
+ deployment.setStatus(new FlinkDeploymentStatus());
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+ observer.observe(deployment, readyContext);
+ Assertions.assertEquals(1, flinkService.getRunningCount());
+ flinkService.markApplicationJobFailedWithError(
+ JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()),
+ "Job failed");
+ observer.observe(deployment, readyContext);
+ Assertions.assertEquals(0, flinkService.getRunningCount());
+ Assertions.assertTrue(deployment.getStatus().getError().contains("Job failed"));
+ }
+
@Test
public void observeSavepoint() throws Exception {
TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
ApplicationObserver observer =
new ApplicationObserver(
- null, flinkService, configManager, new TestingStatusHelper<>());
+ kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
@@ -191,13 +262,12 @@ public class ApplicationObserverTest {
// savepoint error within grace period
assertEquals(
0,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
- .size());
+ (int)
+ kubernetesClient.v1().events()
+ .inNamespace(deployment.getMetadata().getNamespace()).list()
+ .getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .count());
observer.observe(deployment, readyContext);
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
@@ -226,21 +296,16 @@ public class ApplicationObserverTest {
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
1,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
- .size());
+ kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+ .list().getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .count());
assertEquals(
1,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
+ kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+ .list().getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .collect(Collectors.toList())
.get(0)
.getCount());
@@ -314,21 +379,16 @@ public class ApplicationObserverTest {
assertEquals(
1,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
- .size());
+ kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+ .list().getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .count());
assertEquals(
2,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
+ kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+ .list().getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .collect(Collectors.toList())
.get(0)
.getCount());
@@ -379,7 +439,7 @@ public class ApplicationObserverTest {
TestingFlinkService flinkService = new TestingFlinkService();
ApplicationObserver observer =
new ApplicationObserver(
- null, flinkService, configManager, new TestingStatusHelper<>());
+ kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 68a0c36..766c35c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -45,7 +45,7 @@ public class SessionJobObserverTest {
@Test
public void testBasicObserve() throws Exception {
final var sessionJob = TestUtils.buildSessionJob();
- final var flinkService = new TestingFlinkService();
+ final var flinkService = new TestingFlinkService(kubernetesClient);
final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
final var observer =
new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
@@ -106,7 +106,7 @@ public class SessionJobObserverTest {
@Test
public void testObserveWithEffectiveConfig() throws Exception {
final var sessionJob = TestUtils.buildSessionJob();
- final var flinkService = new TestingFlinkService();
+ final var flinkService = new TestingFlinkService(kubernetesClient);
final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
final var observer =
new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());