You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/09 07:24:06 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28846] Trigger event on validation error
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f5b1d4a6 [FLINK-28846] Trigger event on validation error
f5b1d4a6 is described below
commit f5b1d4a62155b11d4d4b5483df41e3215be8106a
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Aug 8 13:18:38 2022 +0200
[FLINK-28846] Trigger event on validation error
---
.../flink/kubernetes/operator/FlinkOperator.java | 7 ++++-
.../controller/FlinkDeploymentController.java | 6 ++++
.../controller/FlinkSessionJobController.java | 12 +++++++-
.../operator/crd/status/CommonStatus.java | 5 +++-
.../kubernetes/operator/utils/EventRecorder.java | 3 +-
.../controller/FlinkDeploymentControllerTest.java | 33 +++++++++++++++++++---
6 files changed, 58 insertions(+), 8 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 07e8e98e..202ccdeb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -152,7 +152,12 @@ public class FlinkOperator {
var observer = new SessionJobObserver(flinkServiceFactory, configManager, eventRecorder);
var controller =
new FlinkSessionJobController(
- configManager, validators, reconciler, observer, statusRecorder);
+ configManager,
+ validators,
+ reconciler,
+ observer,
+ statusRecorder,
+ eventRecorder);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dae1f64f..a43d9ccd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -157,6 +157,12 @@ public class FlinkDeploymentController
for (FlinkResourceValidator validator : validators) {
Optional<String> validationError = validator.validateDeployment(deployment);
if (validationError.isPresent()) {
+ eventRecorder.triggerEvent(
+ deployment,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.ValidationError,
+ EventRecorder.Component.Operator,
+ validationError.get());
return ReconciliationUtils.applyValidationErrorAndResetSpec(
deployment, validationError.get());
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 6d23f798..d037571e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -61,18 +62,21 @@ public class FlinkSessionJobController
private final Reconciler<FlinkSessionJob> reconciler;
private final Observer<FlinkSessionJob> observer;
private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder;
+ private final EventRecorder eventRecorder;
public FlinkSessionJobController(
FlinkConfigManager configManager,
Set<FlinkResourceValidator> validators,
Reconciler<FlinkSessionJob> reconciler,
Observer<FlinkSessionJob> observer,
- StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder,
+ EventRecorder eventRecorder) {
this.configManager = configManager;
this.validators = validators;
this.reconciler = reconciler;
this.observer = observer;
this.statusRecorder = statusRecorder;
+ this.eventRecorder = eventRecorder;
}
@Override
@@ -128,6 +132,12 @@ public class FlinkSessionJobController
validator.validateSessionJob(
sessionJob, context.getSecondaryResource(FlinkDeployment.class));
if (validationError.isPresent()) {
+ eventRecorder.triggerEvent(
+ sessionJob,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.ValidationError,
+ EventRecorder.Component.Operator,
+ validationError.get());
return ReconciliationUtils.applyValidationErrorAndResetSpec(
sessionJob, validationError.get());
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 76e0e024..44c9ebdc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -27,6 +27,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
/** Last observed common status of the Flink deployment/Flink SessionJob. */
@Experimental
@@ -54,7 +55,9 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
var reconciliationStatus = getReconciliationStatus();
if (reconciliationStatus.isFirstDeployment()) {
- return ResourceLifecycleState.CREATED;
+ return StringUtils.isEmpty(error)
+ ? ResourceLifecycleState.CREATED
+ : ResourceLifecycleState.FAILED;
}
switch (reconciliationStatus.getState()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 691bca94..9bc55762 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -122,6 +122,7 @@ public class EventRecorder {
StatusChanged,
SavepointError,
Cleanup,
- Missing
+ Missing,
+ ValidationError
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index c57f2e2f..d80f8cb7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -61,9 +62,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
+import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -576,16 +579,21 @@ public class FlinkDeploymentControllerTest {
.getState());
appCluster.getSpec().setLogConfiguration(Map.of("invalid", "conf"));
testController.reconcile(appCluster, TestUtils.createEmptyContext());
- assertEquals(1, testController.events().size());
+ assertEquals(2, testController.events().size());
+ testController.events().remove();
assertEquals(
EventRecorder.Reason.Submit,
- EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ EventRecorder.Reason.valueOf(testController.events().remove().getReason()));
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
- assertEquals(1, testController.events().size());
+ var statusEvents =
+ testController.events().stream()
+ .filter(e -> !e.getReason().equals(ValidationError.name()))
+ .collect(Collectors.toList());
+ assertEquals(1, statusEvents.size());
assertEquals(
EventRecorder.Reason.StatusChanged,
- EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -865,6 +873,23 @@ public class FlinkDeploymentControllerTest {
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
}
+ @Test
+ public void testValidationError() throws Exception {
+ assertTrue(testController.events().isEmpty());
+ var flinkDeployment = TestUtils.buildApplicationCluster();
+ flinkDeployment.getSpec().getJob().setParallelism(-1);
+ testController.reconcile(flinkDeployment, context);
+
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState());
+
+ var event = testController.events().remove();
+ assertEquals("Warning", event.getType());
+ assertEquals("ValidationError", event.getReason());
+ assertTrue(event.getMessage().startsWith("Job parallelism "));
+ }
+
@Test
public void cleanUpNewDeployment() {
FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();