You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/23 09:32:49 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26236] Track and cap retries in ReconciliationStatus
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 7198c1d [FLINK-26236] Track and cap retries in ReconciliationStatus
7198c1d is described below
commit 7198c1d36da1e35262057825341fe85639dcd18f
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 21 22:22:28 2022 +0100
[FLINK-26236] Track and cap retries in ReconciliationStatus
Closes #13
---
.../controller/FlinkDeploymentController.java | 69 ++++++++--------------
.../kubernetes/operator/crd/FlinkDeployment.java | 8 ++-
.../exception/ReconciliationException.java | 34 +++++++++++
.../operator/observer/JobStatusObserver.java | 35 ++++-------
.../controller/FlinkDeploymentControllerTest.java | 20 ++-----
.../operator/observer/JobStatusObserverTest.java | 4 +-
6 files changed, 84 insertions(+), 86 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dbafd7d..3ab2638 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,8 +19,9 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -47,15 +48,14 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** Controller that runs the main reconcile loop for Flink deployments. */
-@ControllerConfiguration
+@ControllerConfiguration(generationAwareEventProcessing = false)
public class FlinkDeploymentController
implements Reconciler<FlinkDeployment>,
ErrorStatusHandler<FlinkDeployment>,
EventSourceInitializer<FlinkDeployment> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
- public static final int OBSERVE_REFRESH_SECONDS = 10;
- public static final int RECONCILE_ERROR_REFRESH_SECONDS = 5;
+ public static final int REFRESH_SECONDS = 60;
private final KubernetesClient kubernetesClient;
@@ -94,51 +94,23 @@ public class FlinkDeploymentController
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
- if (flinkApp.getStatus() == null) {
- flinkApp.setStatus(new FlinkDeploymentStatus());
- }
Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
-
- boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-
- if (!successfulObserve) {
- // Cluster not accessible let's retry
- return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
- }
-
- if (!specChanged(flinkApp)) {
- // Successfully observed the cluster after reconciliation, no need to reschedule
- return UpdateControl.updateStatus(flinkApp);
- }
-
try {
- reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
- } catch (Exception e) {
- String err = "Error while reconciling deployment change: " + e.getMessage();
- String lastErr = flinkApp.getStatus().getReconciliationStatus().getError();
- if (!err.equals(lastErr)) {
- // Log new errors on the first instance
- LOG.error("Error while reconciling deployment change", e);
- updateForReconciliationError(flinkApp, err);
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
- } else {
- return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+ boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
+ if (successfulObserve) {
+ reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
+ updateForReconciliationSuccess(flinkApp);
}
+ } catch (InvalidDeploymentException ide) {
+ LOG.error("Reconciliation failed", ide);
+ updateForReconciliationError(flinkApp, ide.getMessage());
+ return UpdateControl.updateStatus(flinkApp);
+ } catch (Exception e) {
+ throw new ReconciliationException(e);
}
-
- // Everything went well, update status and reschedule for observation
- updateForReconciliationSuccess(flinkApp);
return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
- }
-
- private boolean specChanged(FlinkDeployment flinkApp) {
- return !flinkApp.getSpec()
- .equals(flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+ .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
}
private void reconcileFlinkDeployment(
@@ -178,7 +150,14 @@ public class FlinkDeploymentController
@Override
public Optional<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) {
- LOG.warn("TODO: handle error status");
- return Optional.empty();
+ LOG.warn(
+ "attempt count: {}, last attempt: {}",
+ retryInfo.getAttemptCount(),
+ retryInfo.isLastAttempt());
+
+ updateForReconciliationError(
+ flinkApp,
+ (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
+ return Optional.of(flinkApp);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 7a01bb8..2eb1bbf 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -35,4 +35,10 @@ import io.fabric8.kubernetes.model.annotation.Version;
@Version("v1alpha1")
@ShortNames({"flinkdep"})
public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus>
- implements Namespaced {}
+ implements Namespaced {
+
+ @Override
+ protected FlinkDeploymentStatus initStatus() {
+ return new FlinkDeploymentStatus();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java
new file mode 100644
index 0000000..742494b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+/** Exception for wrapping reconciliation errors. */
+public class ReconciliationException extends RuntimeException {
+
+ public ReconciliationException(Throwable cause) {
+ super(cause);
+ }
+
+ public ReconciliationException(String msg) {
+ super(msg);
+ }
+
+ public ReconciliationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 6297365..c0dabcb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -46,7 +46,8 @@ public class JobStatusObserver {
this.flinkService = flinkService;
}
- public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
FlinkDeploymentSpec lastReconciledSpec =
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
@@ -68,31 +69,17 @@ public class JobStatusObserver {
}
LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
- try {
- Collection<JobStatusMessage> clusterJobStatuses =
- flinkService.listJobs(effectiveConfig);
- if (clusterJobStatuses.isEmpty()) {
- LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
- return false;
- } else {
- flinkAppStatus.setJobStatus(
- mergeJobStatus(
- flinkAppStatus.getJobStatus(),
- new ArrayList<>(clusterJobStatuses)));
- LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
- return true;
- }
-
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to list jobs for {}", flinkApp, e);
- } else {
- LOG.warn(
- "Failed to list jobs for {}, retrying...",
- flinkApp.getMetadata().getName());
- }
+ Collection<JobStatusMessage> clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+ if (clusterJobStatuses.isEmpty()) {
+ LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
return false;
+ } else {
+ flinkAppStatus.setJobStatus(
+ mergeJobStatus(
+ flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses)));
+ LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+ return true;
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3b4ae63..730bc80 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -51,7 +51,7 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, null);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- FlinkDeploymentController.OBSERVE_REFRESH_SECONDS * 1000,
+ FlinkDeploymentController.REFRESH_SECONDS * 1000,
(long) updateControl.getScheduleDelay().get());
// Validate reconciliation status
@@ -63,7 +63,9 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, null);
assertTrue(updateControl.isUpdateStatus());
- assertFalse(updateControl.getScheduleDelay().isPresent());
+ assertEquals(
+ FlinkDeploymentController.REFRESH_SECONDS * 1000,
+ (long) updateControl.getScheduleDelay().get());
// Validate job status
JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -77,23 +79,13 @@ public class FlinkDeploymentControllerTest {
appCluster.getSpec().setJob(null);
updateControl = testController.reconcile(appCluster, null);
assertTrue(updateControl.isUpdateStatus());
- assertEquals(
- FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 1000,
- (long) updateControl.getScheduleDelay().get());
+ assertFalse(updateControl.getScheduleDelay().isPresent());
reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
assertFalse(reconciliationStatus.isSuccess());
- assertEquals(
- "Error while reconciling deployment change: Cannot switch from job to session cluster",
- reconciliationStatus.getError());
+ assertEquals("Cannot switch from job to session cluster", reconciliationStatus.getError());
assertNotNull(reconciliationStatus.getLastReconciledSpec().getJob());
- updateControl = testController.reconcile(appCluster, null);
- assertTrue(updateControl.isNoUpdate());
- assertEquals(
- FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 1000,
- (long) updateControl.getScheduleDelay().get());
-
// Validate job status correct even with error
jobStatus = appCluster.getStatus().getJobStatus();
expectedJobStatus = flinkService.listJobs().get(0).f1;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 48f7b9e..07a28f6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JobStatusObserverTest {
@Test
- public void observeSessionCluster() {
+ public void observeSessionCluster() throws Exception {
FlinkService flinkService = new TestingFlinkService();
JobStatusObserver observer = new JobStatusObserver(flinkService);
FlinkDeployment deployment = TestUtils.buildSessionCluster();
@@ -50,7 +50,7 @@ public class JobStatusObserverTest {
}
@Test
- public void observeApplicationCluster() {
+ public void observeApplicationCluster() throws Exception {
TestingFlinkService flinkService = new TestingFlinkService();
JobStatusObserver observer = new JobStatusObserver(flinkService);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();