You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/09 07:24:06 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28846] Trigger event on validation error

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f5b1d4a6 [FLINK-28846] Trigger event on validation error
f5b1d4a6 is described below

commit f5b1d4a62155b11d4d4b5483df41e3215be8106a
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Aug 8 13:18:38 2022 +0200

    [FLINK-28846] Trigger event on validation error
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  7 ++++-
 .../controller/FlinkDeploymentController.java      |  6 ++++
 .../controller/FlinkSessionJobController.java      | 12 +++++++-
 .../operator/crd/status/CommonStatus.java          |  5 +++-
 .../kubernetes/operator/utils/EventRecorder.java   |  3 +-
 .../controller/FlinkDeploymentControllerTest.java  | 33 +++++++++++++++++++---
 6 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 07e8e98e..202ccdeb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -152,7 +152,12 @@ public class FlinkOperator {
         var observer = new SessionJobObserver(flinkServiceFactory, configManager, eventRecorder);
         var controller =
                 new FlinkSessionJobController(
-                        configManager, validators, reconciler, observer, statusRecorder);
+                        configManager,
+                        validators,
+                        reconciler,
+                        observer,
+                        statusRecorder,
+                        eventRecorder);
         registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dae1f64f..a43d9ccd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -157,6 +157,12 @@ public class FlinkDeploymentController
         for (FlinkResourceValidator validator : validators) {
             Optional<String> validationError = validator.validateDeployment(deployment);
             if (validationError.isPresent()) {
+                eventRecorder.triggerEvent(
+                        deployment,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.ValidationError,
+                        EventRecorder.Component.Operator,
+                        validationError.get());
                 return ReconciliationUtils.applyValidationErrorAndResetSpec(
                         deployment, validationError.get());
             }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 6d23f798..d037571e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -61,18 +62,21 @@ public class FlinkSessionJobController
     private final Reconciler<FlinkSessionJob> reconciler;
     private final Observer<FlinkSessionJob> observer;
     private final StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder;
+    private final EventRecorder eventRecorder;
 
     public FlinkSessionJobController(
             FlinkConfigManager configManager,
             Set<FlinkResourceValidator> validators,
             Reconciler<FlinkSessionJob> reconciler,
             Observer<FlinkSessionJob> observer,
-            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder,
+            EventRecorder eventRecorder) {
         this.configManager = configManager;
         this.validators = validators;
         this.reconciler = reconciler;
         this.observer = observer;
         this.statusRecorder = statusRecorder;
+        this.eventRecorder = eventRecorder;
     }
 
     @Override
@@ -128,6 +132,12 @@ public class FlinkSessionJobController
                     validator.validateSessionJob(
                             sessionJob, context.getSecondaryResource(FlinkDeployment.class));
             if (validationError.isPresent()) {
+                eventRecorder.triggerEvent(
+                        sessionJob,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.ValidationError,
+                        EventRecorder.Component.Operator,
+                        validationError.get());
                 return ReconciliationUtils.applyValidationErrorAndResetSpec(
                         sessionJob, validationError.get());
             }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 76e0e024..44c9ebdc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -27,6 +27,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
 
 /** Last observed common status of the Flink deployment/Flink SessionJob. */
 @Experimental
@@ -54,7 +55,9 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
         var reconciliationStatus = getReconciliationStatus();
 
         if (reconciliationStatus.isFirstDeployment()) {
-            return ResourceLifecycleState.CREATED;
+            return StringUtils.isEmpty(error)
+                    ? ResourceLifecycleState.CREATED
+                    : ResourceLifecycleState.FAILED;
         }
 
         switch (reconciliationStatus.getState()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 691bca94..9bc55762 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -122,6 +122,7 @@ public class EventRecorder {
         StatusChanged,
         SavepointError,
         Cleanup,
-        Missing
+        Missing,
+        ValidationError
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index c57f2e2f..d80f8cb7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -61,9 +62,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
+import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -576,16 +579,21 @@ public class FlinkDeploymentControllerTest {
                         .getState());
         appCluster.getSpec().setLogConfiguration(Map.of("invalid", "conf"));
         testController.reconcile(appCluster, TestUtils.createEmptyContext());
-        assertEquals(1, testController.events().size());
+        assertEquals(2, testController.events().size());
+        testController.events().remove();
         assertEquals(
                 EventRecorder.Reason.Submit,
-                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+                EventRecorder.Reason.valueOf(testController.events().remove().getReason()));
         testController.reconcile(appCluster, context);
         testController.reconcile(appCluster, context);
-        assertEquals(1, testController.events().size());
+        var statusEvents =
+                testController.events().stream()
+                        .filter(e -> !e.getReason().equals(ValidationError.name()))
+                        .collect(Collectors.toList());
+        assertEquals(1, statusEvents.size());
         assertEquals(
                 EventRecorder.Reason.StatusChanged,
-                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+                EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -865,6 +873,23 @@ public class FlinkDeploymentControllerTest {
                 appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
     }
 
+    @Test
+    public void testValidationError() throws Exception {
+        assertTrue(testController.events().isEmpty());
+        var flinkDeployment = TestUtils.buildApplicationCluster();
+        flinkDeployment.getSpec().getJob().setParallelism(-1);
+        testController.reconcile(flinkDeployment, context);
+
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState());
+
+        var event = testController.events().remove();
+        assertEquals("Warning", event.getType());
+        assertEquals("ValidationError", event.getReason());
+        assertTrue(event.getMessage().startsWith("Job parallelism "));
+    }
+
     @Test
     public void cleanUpNewDeployment() {
         FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();