You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/03 10:31:17 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28331] Persist status after every observe loop
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a2db6c3 [FLINK-28331] Persist status after every observe loop
a2db6c3 is described below
commit a2db6c31e21d79715324ed45d7138cb8f6d6149e
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Fri Jul 1 14:24:33 2022 +0200
[FLINK-28331] Persist status after every observe loop
---
.../controller/FlinkDeploymentController.java | 1 +
.../controller/FlinkSessionJobController.java | 1 +
.../operator/observer/SavepointObserver.java | 30 --------
.../flink/kubernetes/operator/TestUtils.java | 6 +-
.../controller/DeploymentRecoveryTest.java | 7 +-
.../controller/FlinkDeploymentControllerTest.java | 79 +++++++++++++++++++++-
.../operator/controller/RollbackTest.java | 4 +-
7 files changed, 90 insertions(+), 38 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index d36f96b..d75d29c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -117,6 +117,7 @@ public class FlinkDeploymentController
previousDeployment,
false);
}
+ statusRecorder.patchAndCacheStatus(flinkApp);
reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, context);
} catch (DeploymentFailedException dfe) {
handleDeploymentFailed(flinkApp, dfe);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 30ff1ea..7f2f6bd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -96,6 +96,7 @@ public class FlinkSessionJobController
}
try {
+ statusRecorder.patchAndCacheStatus(flinkSessionJob);
reconciler.reconcile(flinkSessionJob, context);
} catch (Exception e) {
throw new ReconciliationException(e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index 74f3d35..aa3590a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
/** An observer of savepoint progress. */
public class SavepointObserver<STATUS extends CommonStatus<?>> {
@@ -69,10 +68,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
var jobStatus = resource.getStatus().getJobStatus();
var savepointInfo = jobStatus.getSavepointInfo();
var jobId = jobStatus.getJobId();
- var previousLastSpPath =
- Optional.ofNullable(savepointInfo.getLastSavepoint())
- .map(Savepoint::getLocation)
- .orElse(null);
// If any manual or periodic savepoint is in progress, observe it
if (SavepointUtils.savepointInProgress(jobStatus)) {
@@ -83,8 +78,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
}
-
- patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
}
/**
@@ -201,27 +194,4 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
throw new ReconciliationException(e);
}
}
-
- /**
- * Patch the Kubernetes Flink resource status if we observed a new last savepoint. This is
- * crucial to not lose this information once the reconciler shuts down the cluster.
- */
- private void patchStatusOnSavepointChange(
- AbstractFlinkResource<?, STATUS> resource,
- SavepointInfo savepointInfo,
- String previousLastSpPath) {
- var currentLastSpPath =
- Optional.ofNullable(savepointInfo.getLastSavepoint())
- .map(Savepoint::getLocation)
- .orElse(null);
-
- // If the last savepoint information changes we need to patch the status
- // to avoid losing this in case of an operator failure after the cluster was shut down
- if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
- LOG.info(
- "Updating resource status after observing new last savepoint {}",
- currentLastSpPath);
- statusRecorder.patchAndCacheStatus(resource);
- }
- }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 034904d..cdbaa8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -434,10 +434,8 @@ public class TestUtils {
public static FlinkDeploymentController createTestController(
FlinkConfigManager configManager,
KubernetesClient kubernetesClient,
- TestingFlinkService flinkService) {
-
- var statusRecorder =
- new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (r, s) -> {});
+ TestingFlinkService flinkService,
+ StatusRecorder statusRecorder) {
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
return new FlinkDeploymentController(
configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index a9d59fe..1425dcd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -60,7 +61,11 @@ public class DeploymentRecoveryTest {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
- TestUtils.createTestController(configManager, kubernetesClient, flinkService);
+ TestUtils.createTestController(
+ configManager,
+ kubernetesClient,
+ flinkService,
+ new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index b7c1f1c..c809418 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
@@ -38,6 +40,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -60,6 +63,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,24 +86,41 @@ public class FlinkDeploymentControllerTest {
private KubernetesMockServer mockServer;
private KubernetesClient kubernetesClient;
+ private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
@BeforeEach
public void setup() {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
- TestUtils.createTestController(configManager, kubernetesClient, flinkService);
+ TestUtils.createTestController(
+ configManager,
+ kubernetesClient,
+ flinkService,
+ new StatusRecorder<>(kubernetesClient, statusUpdateCounter));
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}
@ParameterizedTest
@EnumSource(FlinkVersion.class)
public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception {
- FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
UpdateControl<FlinkDeployment> updateControl;
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_16);
+ assertEquals(
+ JobManagerDeploymentStatus.MISSING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertNull(appCluster.getStatus().getJobStatus().getState());
+
updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+ assertEquals(2, statusUpdateCounter.getCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -117,6 +138,13 @@ public class FlinkDeploymentControllerTest {
assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+ assertEquals(3, statusUpdateCounter.getCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -124,6 +152,28 @@ public class FlinkDeploymentControllerTest {
updateControl.getScheduleDelay());
updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+ assertEquals(4, statusUpdateCounter.getCount());
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ Optional.of(
+ configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ // Stable loop
+ updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+ assertEquals(4, statusUpdateCounter.getCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -143,6 +193,13 @@ public class FlinkDeploymentControllerTest {
// Send in invalid update
appCluster.getSpec().setJob(null);
updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+ assertEquals(5, statusUpdateCounter.getCount());
assertFalse(updateControl.isUpdateStatus());
reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
@@ -808,4 +865,22 @@ public class FlinkDeploymentControllerTest {
}
return args.stream();
}
+
+ private static class StatusUpdateCounter
+ implements BiConsumer<
+ AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> {
+ private int counter;
+
+ @Override
+ public void accept(
+ AbstractFlinkResource<?, FlinkDeploymentStatus>
+ flinkDeploymentStatusAbstractFlinkResource,
+ FlinkDeploymentStatus flinkDeploymentStatus) {
+ counter++;
+ }
+
+ public int getCount() {
+ return counter;
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 1f53a50..b591c7e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.util.function.ThrowingRunnable;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +71,8 @@ public class RollbackTest {
TestUtils.createTestController(
new FlinkConfigManager(new Configuration()),
kubernetesClient,
- flinkService);
+ flinkService,
+ new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}