You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/18 11:56:26 UTC

[2/2] flink git commit: [FLINK-9866] Allow passing command line arguments to standalone job

[FLINK-9866] Allow passing command line arguments to standalone job

This closes #6344


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64feb4e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64feb4e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64feb4e7

Branch: refs/heads/master
Commit: 64feb4e7e86ffd2d1b57b27b287b1d261abd065d
Parents: 2ec7212
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Mon Jul 16 17:30:17 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jul 18 13:54:00 2018 +0200

----------------------------------------------------------------------
 flink-container/docker/docker-compose.yml            |  2 +-
 .../entrypoint/StandaloneJobClusterEntryPoint.java   | 15 ++++++++++-----
 .../StandaloneJobClusterEntryPointTest.java          |  5 +++--
 .../apache/flink/container/entrypoint/TestJob.java   |  4 +++-
 4 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml
index 81e4c8c..5fddff3 100644
--- a/flink-container/docker/docker-compose.yml
+++ b/flink-container/docker/docker-compose.yml
@@ -24,7 +24,7 @@ services:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
     ports:
       - "8081:8081"
-    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster
+    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster ${FLINK_JOB_ARGUMENTS}
 
   taskmanager:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}

http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
----------------------------------------------------------------------
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 47cca4c..57f7ca2 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -51,20 +51,23 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * {@link JobClusterEntrypoint} which is started with a job in a predefined
  * location.
  */
 public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 
-	private  static final String[] EMPTY_ARGS = new String[0];
+	private final String[] programArguments;
 
 	@Nonnull
 	private final String jobClassName;
 
-	StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) {
+	StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) {
 		super(configuration);
-		this.jobClassName = jobClassName;
+		this.programArguments = checkNotNull(programArguments);
+		this.jobClassName = checkNotNull(jobClassName);
 	}
 
 	@Override
@@ -84,7 +87,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	private PackagedProgram createPackagedProgram() throws FlinkException {
 		try {
 			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
-			return new PackagedProgram(mainClass, EMPTY_ARGS);
+			return new PackagedProgram(mainClass, programArguments);
 		} catch (ClassNotFoundException | ProgramInvocationException e) {
 			throw new FlinkException("Could not load the provied entrypoint class.", e);
 		}
@@ -148,7 +151,9 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 
 		configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
 
-		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName());
+		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration,
+			clusterConfiguration.getJobClassName(),
+			clusterConfiguration.getArgs());
 
 		entrypoint.startCluster();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
index 360799d..d97d2b7 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -42,11 +42,12 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
 		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
 		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
 			configuration,
-			TestJob.class.getCanonicalName());
+			TestJob.class.getCanonicalName(),
+			new String[] {"--arg", "suffix"});
 
 		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
 
-		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName())));
+		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index 5f8857f..ada434d 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,6 +36,7 @@ public class TestJob {
 		final SingleOutputStreamOperator<Integer> mapper = source.map(element -> 2 * element);
 		mapper.addSink(new DiscardingSink<>());
 
-		env.execute(TestJob.class.getCanonicalName());
+		ParameterTool parameterTool = ParameterTool.fromArgs(args);
+		env.execute(TestJob.class.getCanonicalName() + "-" + parameterTool.getRequired("arg"));
 	}
 }