You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/27 08:35:19 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 39595691 [FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15
39595691 is described below
commit 395956910af19f0ba6f1de1ae4d730aee4b6504f
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Mon Sep 26 13:50:53 2022 +0200
[FLINK-29376] Set SHUTDOWN_ON_APPLICATION_FINISH to false for Flink version less than 1.15
---
.../operator/config/FlinkConfigBuilder.java | 4 +++-
.../operator/config/FlinkConfigBuilderTest.java | 20 ++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index f8899803..daa74b6f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -150,7 +150,9 @@ public class FlinkConfigBuilder {
}
// We need to keep the application clusters around for proper operator behaviour
- effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+ if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) {
+ effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+ }
if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 4fe27b35..4cf3035e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
@@ -51,6 +52,8 @@ import io.fabric8.kubernetes.api.model.Pod;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
@@ -60,6 +63,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
@@ -163,6 +167,22 @@ public class FlinkConfigBuilderTest {
Assertions.assertEquals(false, configuration.get(WebOptions.CANCEL_ENABLE));
}
+ @ParameterizedTest
+ @EnumSource(FlinkVersion.class)
+ public void testApplyFlinkConfigurationShouldSetShutdownOnFinishBasedOnFlinkVersion(
+ FlinkVersion flinkVersion) {
+ flinkDeployment.getSpec().setFlinkVersion(flinkVersion);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyFlinkConfiguration()
+ .build();
+ if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
+ Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
+ } else {
+ Assertions.assertTrue(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
+ }
+ }
+
@Test
public void testApplyLogConfiguration() throws IOException {
Configuration configuration =