You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/27 08:35:19 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 39595691 [FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15
39595691 is described below

commit 395956910af19f0ba6f1de1ae4d730aee4b6504f
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Mon Sep 26 13:50:53 2022 +0200

    [FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15
---
 .../operator/config/FlinkConfigBuilder.java          |  4 +++-
 .../operator/config/FlinkConfigBuilderTest.java      | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index f8899803..daa74b6f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -150,7 +150,9 @@ public class FlinkConfigBuilder {
             }
 
             // We need to keep the application clusters around for proper operator behaviour
-            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) {
+                effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            }
             if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
                 setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
             }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 4fe27b35..4cf3035e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
@@ -51,6 +52,8 @@ import io.fabric8.kubernetes.api.model.Pod;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,6 +63,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
 import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
@@ -163,6 +167,22 @@ public class FlinkConfigBuilderTest {
         Assertions.assertEquals(false, configuration.get(WebOptions.CANCEL_ENABLE));
     }
 
+    @ParameterizedTest
+    @EnumSource(FlinkVersion.class)
+    public void testApplyFlinkConfigurationShouldSetShutdownOnFinishBasedOnFlinkVersion(
+            FlinkVersion flinkVersion) {
+        flinkDeployment.getSpec().setFlinkVersion(flinkVersion);
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyFlinkConfiguration()
+                        .build();
+        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
+            Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
+        } else {
+            Assertions.assertTrue(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
+        }
+    }
+
     @Test
     public void testApplyLogConfiguration() throws IOException {
         Configuration configuration =