You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/23 13:58:41 UTC
[flink-kubernetes-operator] branch main updated: [fix] Use JobSpec arguments in Standalone Application Mode
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e2b829c7 [fix] Use JobSpec arguments in Standalone Application Mode
e2b829c7 is described below
commit e2b829c7df7501760dec8f9aa47685c680b227cf
Author: Avocadomaster <88...@users.noreply.github.com>
AuthorDate: Thu Sep 22 11:33:38 2022 +0200
[fix] Use JobSpec arguments in Standalone Application Mode
---
.../operator/config/FlinkConfigBuilder.java | 7 +++++
.../operator/config/FlinkConfigBuilderTest.java | 11 +++++---
.../CmdStandaloneJobManagerDecorator.java | 5 ++++
.../StandaloneKubernetesJobManagerParameters.java | 8 ++++++
.../KubernetesStandaloneClusterDescriptorTest.java | 30 ++++++++++++++++++++++
5 files changed, 58 insertions(+), 3 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index d1423eb4..f8899803 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -57,6 +57,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
@@ -274,6 +275,12 @@ public class FlinkConfigBuilder {
effectiveConfig.set(
ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
}
+
+ if (jobSpec.getArgs() != null) {
+ effectiveConfig.set(
+ ApplicationConfiguration.APPLICATION_ARGS,
+ Arrays.asList(jobSpec.getArgs()));
+ }
} else {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 69ade6fb..4fe27b35 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -300,9 +300,11 @@ public class FlinkConfigBuilderTest {
@Test
public void testApplyJobOrSessionSpec() throws Exception {
- flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
+ FlinkDeployment deploymentClone = ReconciliationUtils.clone(flinkDeployment);
+ deploymentClone.getSpec().getJob().setAllowNonRestoredState(true);
+ deploymentClone.getSpec().getJob().setArgs(new String[] {"--test", "123"});
var configuration =
- new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ new FlinkConfigBuilder(deploymentClone, new Configuration())
.applyJobOrSessionSpec()
.build();
Assertions.assertTrue(
@@ -313,8 +315,11 @@ public class FlinkConfigBuilderTest {
Assertions.assertEquals(SAMPLE_JAR, configuration.get(PipelineOptions.JARS).get(0));
Assertions.assertEquals(
Integer.valueOf(2), configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+ Assertions.assertEquals(
+ List.of("--test", "123"),
+ configuration.get(ApplicationConfiguration.APPLICATION_ARGS));
- var dep = ReconciliationUtils.clone(flinkDeployment);
+ var dep = ReconciliationUtils.clone(deploymentClone);
dep.getSpec().setTaskManager(new TaskManagerSpec());
dep.getSpec().getTaskManager().setReplicas(3);
dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "4");
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index 9bfcc866..44ded3c2 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -85,6 +85,11 @@ public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDeco
args.add(allowNonRestoredState.toString());
}
+ List<String> jobSpecArgs = kubernetesJobManagerParameters.getJobSpecArgs();
+ if (jobSpecArgs != null) {
+ args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
+ }
+
return args;
}
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index a46172f4..b2ca29af 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -92,4 +93,11 @@ public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManag
public boolean isPipelineClasspathDefined() {
return flinkConfig.contains(PipelineOptions.CLASSPATHS);
}
+
+ public List<String> getJobSpecArgs() {
+ if (flinkConfig.contains(ApplicationConfiguration.APPLICATION_ARGS)) {
+ return flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS);
+ }
+ return null;
+ }
}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index 641ddfdc..5a0d6a0e 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -176,4 +176,34 @@ public class KubernetesStandaloneClusterDescriptorTest {
Constants.REST_PORT);
assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
}
+
+ @Test
+ public void testMainContainerArgsIntegrity() throws Exception {
+ ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+
+ clusterDescriptor.deployApplicationCluster(
+ clusterSpecification,
+ new ApplicationConfiguration(new String[] {"--test", "123"}, "test"));
+ List<Deployment> deployments =
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(TestUtils.TEST_NAMESPACE)
+ .list()
+ .getItems();
+ String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+
+ Deployment jmDeployment =
+ deployments.stream()
+ .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName))
+ .findFirst()
+ .orElse(null);
+ assertTrue(
+ jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+ .anyMatch(c -> c.getArgs().contains("standalone-job")));
+
+ assertTrue(
+ jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+ .anyMatch(c -> c.getArgs().contains("123")));
+ }
}