You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/28 16:10:32 UTC
[flink-kubernetes-operator] branch release-1.3 updated: [FLINK-30406] Detect when jobmanager never started
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new 5ed117cb [FLINK-30406] Detect when jobmanager never started
5ed117cb is described below
commit 5ed117cb8e3e2b00f2abeb0f98d7555d78e0afe3
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Dec 28 17:10:26 2022 +0100
[FLINK-30406] Detect when jobmanager never started
---
.../observer/deployment/SessionObserver.java | 2 +-
.../AbstractFlinkResourceReconciler.java | 44 ++++-
.../deployment/AbstractJobReconciler.java | 17 +-
.../deployment/ApplicationReconciler.java | 46 ++---
.../kubernetes/operator/utils/FlinkUtils.java | 35 ++++
.../flink/kubernetes/operator/TestUtils.java | 17 +-
.../operator/TestingApplicationReconciler.java | 57 ++++++
.../kubernetes/operator/TestingFlinkService.java | 7 +-
.../controller/FailedDeploymentRestartTest.java | 3 +
.../controller/FlinkDeploymentControllerTest.java | 15 +-
.../controller/UnhealthyDeploymentRestartTest.java | 3 -
.../deployment/ApplicationReconcilerTest.java | 4 +-
.../ApplicationReconcilerUpgradeModeTest.java | 218 +++++++++++++++++++--
.../kubernetes/operator/utils/FlinkUtilsTest.java | 43 ++++
14 files changed, 450 insertions(+), 61 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 41279e13..df4b61c8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -51,7 +51,7 @@ public class SessionObserver extends AbstractFlinkDeploymentObserver {
rs.markReconciledSpecAsStable();
}
} catch (Exception e) {
- logger.error("REST service in session cluster is bad now", e);
+ logger.error("REST service in session cluster timed out", e);
if (e instanceof TimeoutException) {
// check for problems with the underlying deployment
observeJmDeployment(deployment, context, observerContext.getDeployedConfig());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index d1c2c743..29a724d5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,9 +27,12 @@ import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
@@ -93,7 +96,7 @@ public abstract class AbstractFlinkResourceReconciler<
}
@Override
- public final void reconcile(CR cr, Context<?> ctx) throws Exception {
+ public void reconcile(CR cr, Context<?> ctx) throws Exception {
var spec = cr.getSpec();
var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
var status = cr.getStatus();
@@ -109,12 +112,7 @@ public abstract class AbstractFlinkResourceReconciler<
// No further logic is required at this point.
if (reconciliationStatus.isBeforeFirstDeployment()) {
LOG.info("Deploying for the first time");
-
- // Before we try to submit the job we record the current spec in the status so we can
- // handle subsequent deployment and status update errors
- ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
- statusRecorder.patchAndCacheStatus(cr);
-
+ updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
deploy(
cr,
spec,
@@ -177,6 +175,36 @@ public abstract class AbstractFlinkResourceReconciler<
}
}
+ /**
+ * Update the status before the first deployment. We have to record the upgrade mode based on
+ * the initial savepoint path provided, and record the to-be-deployed spec in the status.
+ *
+ * @param cr Related flink resource
+ * @param spec Spec to be deployed
+ * @param deployConfig Deploy configuration
+ * @param status Resource status
+ */
+ private void updateStatusBeforeFirstDeployment(
+ CR cr, SPEC spec, Configuration deployConfig, STATUS status) {
+ if (spec.getJob() != null) {
+ var initialUpgradeMode = UpgradeMode.STATELESS;
+ var initialSp = spec.getJob().getInitialSavepointPath();
+
+ if (initialSp != null) {
+ status.getJobStatus()
+ .getSavepointInfo()
+ .setLastSavepoint(Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+ initialUpgradeMode = UpgradeMode.SAVEPOINT;
+ }
+
+ spec.getJob().setUpgradeMode(initialUpgradeMode);
+ }
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+ // Before we try to submit the job we record the current spec in the status so we can
+ // handle subsequent deployment and status update errors
+ statusRecorder.patchAndCacheStatus(cr);
+ }
+
/**
* Get Flink configuration object for deploying the given spec using {@link #deploy}.
*
@@ -249,7 +277,7 @@ public abstract class AbstractFlinkResourceReconciler<
CR cr, Context<?> context, Configuration observeConfig) throws Exception;
@Override
- public final DeleteControl cleanup(CR resource, Context<?> context) {
+ public DeleteControl cleanup(CR resource, Context<?> context) {
return cleanupInternal(resource, context);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 370d96dd..3c3de7a7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -114,7 +114,7 @@ public abstract class AbstractJobReconciler<
LOG.info("Upgrading/Restarting running job, suspending first...");
}
Optional<UpgradeMode> availableUpgradeMode =
- getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+ getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
if (availableUpgradeMode.isEmpty()) {
return;
}
@@ -134,7 +134,14 @@ public abstract class AbstractJobReconciler<
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
}
}
+
if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+ // We inherit the upgrade mode unless stateless upgrade requested
+ if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+ currentDeploySpec
+ .getJob()
+ .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
+ }
// We record the target spec into an upgrading state before deploying
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
statusRecorder.patchAndCacheStatus(resource);
@@ -153,7 +160,7 @@ public abstract class AbstractJobReconciler<
}
protected Optional<UpgradeMode> getAvailableUpgradeMode(
- CR resource, Configuration deployConfig, Configuration observeConfig) {
+ CR resource, Context<?> ctx, Configuration deployConfig, Configuration observeConfig) {
var status = resource.getStatus();
var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
@@ -162,7 +169,9 @@ public abstract class AbstractJobReconciler<
return Optional.of(UpgradeMode.STATELESS);
}
- if (ReconciliationUtils.isJobInTerminalState(status)) {
+ var flinkService = getFlinkService(resource, ctx);
+ if (ReconciliationUtils.isJobInTerminalState(status)
+ && !flinkService.isHaMetadataAvailable(observeConfig)) {
LOG.info(
"Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
return Optional.of(UpgradeMode.SAVEPOINT);
@@ -215,6 +224,7 @@ public abstract class AbstractJobReconciler<
throws Exception {
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
+ rollbackSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
@@ -261,6 +271,7 @@ public abstract class AbstractJobReconciler<
throws Exception {
LOG.info("Resubmitting Flink job...");
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+ specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
restoreJob(
deployment,
specToRecover,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ee40da92..2acc61ec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -92,11 +93,14 @@ public class ApplicationReconciler
@Override
protected Optional<UpgradeMode> getAvailableUpgradeMode(
- FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) {
+ FlinkDeployment deployment,
+ Context<?> ctx,
+ Configuration deployConfig,
+ Configuration observeConfig) {
var status = deployment.getStatus();
var availableUpgradeMode =
- super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig);
+ super.getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
if (availableUpgradeMode.isPresent()) {
return availableUpgradeMode;
@@ -110,20 +114,27 @@ public class ApplicationReconciler
&& !flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
- if (!flinkService.isHaMetadataAvailable(deployConfig)) {
- if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
- // initial deployment failure, reset to allow for spec change to proceed
- return resetOnMissingStableSpec(deployment, deployConfig);
- }
- } else {
+ if (flinkService.isHaMetadataAvailable(deployConfig)) {
LOG.info(
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
return Optional.of(UpgradeMode.LAST_STATE);
}
}
- if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
- || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
+ var jmDeployStatus = status.getJobManagerDeploymentStatus();
+ if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
+ && status.getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getUpgradeMode()
+ != UpgradeMode.LAST_STATE
+ && FlinkUtils.jmPodNeverStarted(ctx)) {
+ deleteJmThatNeverStarted(deployment, deployConfig);
+ return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
+ }
+
+ if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
+ || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
throw new RecoveryFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -136,21 +147,12 @@ public class ApplicationReconciler
return Optional.empty();
}
- private Optional<UpgradeMode> resetOnMissingStableSpec(
- FlinkDeployment deployment, Configuration deployConfig) {
- // initial deployment failure, reset to allow for spec change to proceed
+ private void deleteJmThatNeverStarted(FlinkDeployment deployment, Configuration deployConfig) {
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), false);
flinkService.waitForClusterShutdown(deployConfig);
- if (!flinkService.isHaMetadataAvailable(deployConfig)) {
- LOG.info("Job never entered stable state. Resetting status for initial deploy");
- ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
- return Optional.empty();
- } else {
- // proceed with upgrade if deployment succeeded between check and delete
- LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
- return Optional.of(UpgradeMode.LAST_STATE);
- }
+ LOG.info("Deleted jobmanager deployment that never started.");
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 221d0558..572845c1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -38,7 +38,10 @@ import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -221,4 +225,35 @@ public class FlinkUtils {
return new JobID(
Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
}
+
+ /**
+ * Check if the jobmanager pod has never successfully started. This is an important check to
+ * determine whether it is possible that the job has started and taken any checkpoints that we
+ * are unaware of.
+ *
+ * <p>The way we check this is by using the availability condition transition timestamp. If the
+ * deployment never transitioned out of the unavailable state, we can assume that the JM never
+ * started.
+ *
+ * @param context Resource context
+ * @return True only if we are sure that the jobmanager pod never started
+ */
+ public static boolean jmPodNeverStarted(Context<?> context) {
+ Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+ if (depOpt.isPresent()) {
+ Deployment deployment = depOpt.get();
+ for (DeploymentCondition condition : deployment.getStatus().getConditions()) {
+ if (condition.getType().equals("Available")) {
+ var createTs = deployment.getMetadata().getCreationTimestamp();
+ if ("False".equals(condition.getStatus())
+ && createTs.equals(condition.getLastTransitionTime())) {
+ return true;
+ }
+ }
+ }
+ }
+
+ // If unsure, return false to be on the safe side
+ return false;
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index aac61d84..1d99f256 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -57,6 +57,7 @@ import java.net.HttpURLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -94,15 +95,27 @@ public class TestUtils extends BaseTestUtils {
}
public static Deployment createDeployment(boolean ready) {
- DeploymentStatus status = new DeploymentStatus();
+ String nowTs = Instant.now().toString();
+ var status = new DeploymentStatus();
status.setAvailableReplicas(ready ? 1 : 0);
status.setReplicas(1);
+ var availableCondition = new DeploymentCondition();
+ availableCondition.setType("Available");
+ availableCondition.setStatus(ready ? "True" : "False");
+ availableCondition.setLastTransitionTime(nowTs);
+ status.setConditions(List.of(availableCondition));
+
DeploymentSpec spec = new DeploymentSpec();
spec.setReplicas(1);
+
+ var meta = new ObjectMeta();
+ meta.setCreationTimestamp(nowTs);
+
Deployment deployment = new Deployment();
- deployment.setMetadata(new ObjectMeta());
+ deployment.setMetadata(meta);
deployment.setSpec(spec);
deployment.setStatus(status);
+
return deployment;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
new file mode 100644
index 00000000..3bcc6283
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+
+/** Testing wrapper for {@link ApplicationReconciler}. */
+public class TestingApplicationReconciler extends ApplicationReconciler {
+ public TestingApplicationReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder,
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
+ super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+ }
+
+ @Override
+ public void reconcile(FlinkDeployment flinkDeployment, Context<?> context) throws Exception {
+ var cr = ReconciliationUtils.clone(flinkDeployment);
+ cr.setStatus(flinkDeployment.getStatus());
+ super.reconcile(cr, context);
+ }
+
+ @Override
+ public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context<?> context) {
+ var cr = ReconciliationUtils.clone(flinkDeployment);
+ cr.setStatus(flinkDeployment.getStatus());
+ return super.cleanup(cr, context);
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index a62bd732..c37fd54f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -95,6 +95,7 @@ public class TestingFlinkService extends AbstractFlinkService {
private final Set<String> sessions = new HashSet<>();
private boolean isPortReady = true;
private boolean haDataAvailable = true;
+ private boolean jobManagerReady = true;
private boolean deployFailure = false;
private Runnable sessionJobSubmittedCallback;
private PodList podList = new PodList();
@@ -122,7 +123,7 @@ public class TestingFlinkService extends AbstractFlinkService {
if (jobs.isEmpty() && sessions.isEmpty()) {
return Optional.empty();
}
- return (Optional<T>) Optional.of(TestUtils.createDeployment(true));
+ return (Optional<T>) Optional.of(TestUtils.createDeployment(jobManagerReady));
}
};
}
@@ -193,6 +194,10 @@ public class TestingFlinkService extends AbstractFlinkService {
this.haDataAvailable = haDataAvailable;
}
+ public void setJobManagerReady(boolean jmReady) {
+ this.jobManagerReady = jmReady;
+ }
+
public void setDeployFailure(boolean deployFailure) {
this.deployFailure = deployFailure;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 99a6ca37..f34d8040 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -96,6 +96,9 @@ public class FailedDeploymentRestartTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+
+ // We started without savepoint
+ appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
assertEquals(
appCluster.getSpec(),
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 4ba5bb44..ebc1241a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -845,8 +845,12 @@ public class FlinkDeploymentControllerTest {
private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
flinkService.clear();
testController.reconcile(appCluster, context);
+ var specClone = ReconciliationUtils.clone(appCluster.getSpec());
+ if (specClone.getJob() != null) {
+ specClone.getJob().setUpgradeMode(UpgradeMode.STATELESS);
+ }
assertEquals(
- appCluster.getSpec(),
+ specClone,
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
flinkService.setPortReady(false);
@@ -864,8 +868,15 @@ public class FlinkDeploymentControllerTest {
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ var expectedSpec = ReconciliationUtils.clone(appCluster.getSpec());
+ if (expectedSpec.getJob() != null
+ && expectedSpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+ expectedSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ }
+
assertEquals(
- appCluster.getSpec(),
+ expectedSpec,
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
flinkService.setPortReady(true);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 07dae7de..a4f67544 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -100,9 +100,6 @@ public class UnhealthyDeploymentRestartTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
- assertEquals(
- appCluster.getSpec(),
- appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
}
private static Stream<Arguments> applicationTestParams() {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c2f6ee2..5b7ac44c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -94,6 +95,7 @@ public class ApplicationReconcilerTest {
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
private TestingFlinkService flinkService;
private ApplicationReconciler reconciler;
+
private Context<FlinkDeployment> context;
private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
@@ -107,7 +109,7 @@ public class ApplicationReconcilerTest {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
reconciler =
- new ApplicationReconciler(
+ new TestingApplicationReconciler(
kubernetesClient,
flinkService,
configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 2445741f..6cdd47fc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -47,10 +48,14 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -75,7 +80,7 @@ public class ApplicationReconcilerUpgradeModeTest {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
reconciler =
- new ApplicationReconciler(
+ new TestingApplicationReconciler(
kubernetesClient,
flinkService,
configManager,
@@ -259,36 +264,213 @@ public class ApplicationReconcilerUpgradeModeTest {
}
@ParameterizedTest
- @EnumSource(UpgradeMode.class)
- public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+ @MethodSource("testUpgradeJmDeployCannotStartParams")
+ public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+ throws Exception {
+
+ flinkService.setHaDataAvailable(true);
+ flinkService.setJobManagerReady(true);
+
+ // Prepare running deployment
+ var deployment = TestUtils.buildApplicationCluster();
+ var jobSpec = deployment.getSpec().getJob();
+ jobSpec.setUpgradeMode(fromMode);
+
+ reconciler.reconcile(deployment, context);
+ var runningJobs = flinkService.listJobs();
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+ // Suspend running deployment and assert that correct upgradeMode is set
+ jobSpec.setState(JobState.SUSPENDED);
+ reconciler.reconcile(deployment, context);
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+ assertEquals(fromMode, lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Restore deployment and assert that correct upgradeMode is set
+ jobSpec.setState(JobState.RUNNING);
+ jobSpec.setUpgradeMode(toMode);
+ reconciler.reconcile(deployment, context);
+
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ assertEquals(
+ toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : fromMode,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Simulate JM failure after deployment, we need this to test the actual upgrade behaviour
+ // with a jobmanager that never started
+ flinkService.setJobManagerReady(false);
flinkService.setHaDataAvailable(false);
- final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+ // Send in a new upgrade while the jobmanager still not started
+ jobSpec.setState(JobState.RUNNING);
+ jobSpec.setEntryClass("newClass");
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure the upgrade was executed as long as we have the savepoint information
+ if (fromMode == UpgradeMode.LAST_STATE && toMode != UpgradeMode.STATELESS) {
+ // We cant make progress as no HA meta available after LAST_STATE, upgrade. It means the
+ // job started and terminated, but we didn't see...
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ } else {
+ assertEquals(
+ JobManagerDeploymentStatus.MISSING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+ assertEquals(
+ toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Complete upgrade and recover succesfully with the latest savepoint
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec();
+
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ assertEquals(1, flinkService.listJobs().size());
+ if (fromMode == UpgradeMode.STATELESS || toMode == UpgradeMode.STATELESS) {
+ assertNull(flinkService.listJobs().get(0).f0);
+ } else {
+ assertEquals("savepoint_0", flinkService.listJobs().get(0).f0);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("testInitialJmDeployCannotStartParams")
+ public void testInitialJmDeployCannotStart(UpgradeMode upgradeMode, boolean initSavepoint)
+ throws Exception {
+
+ // We simulate JM failure to test the initial submission/upgrade behavior when the JM can
+ // never start initially
+ flinkService.setHaDataAvailable(false);
+ flinkService.setJobManagerReady(false);
+
+ var deployment = TestUtils.buildApplicationCluster();
+ if (initSavepoint) {
+ deployment.getSpec().getJob().setInitialSavepointPath("init-sp");
+ }
reconciler.reconcile(deployment, context);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
- // Ready for spec changes, the reconciliation should be performed
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure savepoint path is recorded in status and upgradeMode set correctly for initial
+ // startup. Either stateless or savepoint depending only on the initialSavepointPath
+ // setting.
+ if (initSavepoint) {
+ assertEquals("init-sp", flinkService.listJobs().get(0).f0);
+ assertEquals(
+ "init-sp",
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
+ assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode());
+ } else {
+ assertNull(flinkService.listJobs().get(0).f0);
+ assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+ assertEquals(UpgradeMode.STATELESS, lastReconciledSpec.getJob().getUpgradeMode());
+ }
+
+ // JM is failed, but we submit an upgrade, this should always be possible on initial deploy
+ // failure
final String newImage = "new-image-1";
deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
deployment.getSpec().setImage(newImage);
reconciler.reconcile(deployment, context);
- if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
- assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
- assertEquals(
- ReconciliationState.UPGRADING,
- deployment.getStatus().getReconciliationStatus().getState());
- reconciler.reconcile(deployment, context);
- }
assertEquals(
- newImage,
- deployment
- .getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpec()
- .getImage());
+ ReconciliationState.UPGRADING,
+ deployment.getStatus().getReconciliationStatus().getState());
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // We make sure that stateless upgrade request is respected (drop state)
+ assertEquals(
+ upgradeMode == UpgradeMode.STATELESS
+ ? UpgradeMode.STATELESS
+ : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(newImage, lastReconciledSpec.getImage());
+ assertEquals(
+ upgradeMode == UpgradeMode.STATELESS
+ ? UpgradeMode.STATELESS
+ : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+ assertEquals(1, flinkService.listJobs().size());
+ assertEquals(
+ initSavepoint && upgradeMode != UpgradeMode.STATELESS ? "init-sp" : null,
+ flinkService.listJobs().get(0).f0);
+ }
+
+ private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
+ return Stream.of(
+ Arguments.of(UpgradeMode.LAST_STATE, true),
+ Arguments.of(UpgradeMode.LAST_STATE, false),
+ Arguments.of(UpgradeMode.SAVEPOINT, true),
+ Arguments.of(UpgradeMode.SAVEPOINT, false),
+ Arguments.of(UpgradeMode.STATELESS, true),
+ Arguments.of(UpgradeMode.STATELESS, false));
+ }
+
+ private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {
+ var args = new ArrayList<Arguments>();
+ for (UpgradeMode from : UpgradeMode.values()) {
+ for (UpgradeMode to : UpgradeMode.values()) {
+ args.add(Arguments.of(from, to));
+ }
+ }
+ return args.stream();
+ }
+
+ @Test
+ public void testLastStateOnDeletedDeployment() throws Exception {
+ // Bootstrap running deployment
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ // Delete cluster and keep HA metadata
+ flinkService.deleteClusterDeployment(
+ deployment.getMetadata(), deployment.getStatus(), false);
+ flinkService.setHaDataAvailable(true);
+
+ // Submit upgrade
+ deployment.getSpec().setRestartNonce(123L);
+ reconciler.reconcile(deployment, context);
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure we correctly record upgrade mode to last state
+ assertEquals(UpgradeMode.LAST_STATE, lastReconciledSpec.getJob().getUpgradeMode());
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index ca7d0c39..136e9311 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -30,16 +30,22 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Test;
import java.net.HttpURLConnection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -149,6 +155,43 @@ public class FlinkUtilsTest {
kubernetesClient));
}
+ @Test
+ public void testJmNeverStartedDetection() {
+ var jmDeployment = new Deployment();
+ jmDeployment.setMetadata(new ObjectMeta());
+ jmDeployment.getMetadata().setCreationTimestamp("create-ts");
+ jmDeployment.setStatus(new DeploymentStatus());
+ var deployStatus = jmDeployment.getStatus();
+ var jmNeverStartedCondition =
+ new DeploymentCondition("create-ts", null, null, null, "False", "Available");
+ var jmStartedButStopped =
+ new DeploymentCondition("other-ts", null, null, null, "False", "Available");
+ var jmAvailable =
+ new DeploymentCondition("other-ts", null, null, null, "True", "Available");
+
+ var context =
+ new TestUtils.TestingContext<Deployment>() {
+ @Override
+ public <R> Optional<R> getSecondaryResource(Class<R> aClass, String name) {
+ return (Optional<R>) Optional.of(jmDeployment);
+ }
+ };
+
+ deployStatus.setConditions(Collections.emptyList());
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmNeverStartedCondition));
+ assertTrue(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmStartedButStopped));
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmAvailable));
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ assertFalse(FlinkUtils.jmPodNeverStarted(TestUtils.createEmptyContext()));
+ }
+
@Test
public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
final String name = "empty-ha-configmap";