You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/18 11:56:26 UTC
[2/2] flink git commit: [FLINK-9866] Allow passing command line
arguments to standalone job
[FLINK-9866] Allow passing command line arguments to standalone job
This closes #6344
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64feb4e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64feb4e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64feb4e7
Branch: refs/heads/master
Commit: 64feb4e7e86ffd2d1b57b27b287b1d261abd065d
Parents: 2ec7212
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Mon Jul 16 17:30:17 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jul 18 13:54:00 2018 +0200
----------------------------------------------------------------------
flink-container/docker/docker-compose.yml | 2 +-
.../entrypoint/StandaloneJobClusterEntryPoint.java | 15 ++++++++++-----
.../StandaloneJobClusterEntryPointTest.java | 5 +++--
.../apache/flink/container/entrypoint/TestJob.java | 4 +++-
4 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml
index 81e4c8c..5fddff3 100644
--- a/flink-container/docker/docker-compose.yml
+++ b/flink-container/docker/docker-compose.yml
@@ -24,7 +24,7 @@ services:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
ports:
- "8081:8081"
- command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster
+ command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster ${FLINK_JOB_ARGUMENTS}
taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
----------------------------------------------------------------------
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 47cca4c..57f7ca2 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -51,20 +51,23 @@ import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* {@link JobClusterEntrypoint} which is started with a job in a predefined
* location.
*/
public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
- private static final String[] EMPTY_ARGS = new String[0];
+ private final String[] programArguments;
@Nonnull
private final String jobClassName;
- StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) {
+ StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) {
super(configuration);
- this.jobClassName = jobClassName;
+ this.programArguments = checkNotNull(programArguments);
+ this.jobClassName = checkNotNull(jobClassName);
}
@Override
@@ -84,7 +87,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
private PackagedProgram createPackagedProgram() throws FlinkException {
try {
final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
- return new PackagedProgram(mainClass, EMPTY_ARGS);
+ return new PackagedProgram(mainClass, programArguments);
} catch (ClassNotFoundException | ProgramInvocationException e) {
throw new FlinkException("Could not load the provied entrypoint class.", e);
}
@@ -148,7 +151,9 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
- StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName());
+ StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration,
+ clusterConfiguration.getJobClassName(),
+ clusterConfiguration.getArgs());
entrypoint.startCluster();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
index 360799d..d97d2b7 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -42,11 +42,12 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
configuration,
- TestJob.class.getCanonicalName());
+ TestJob.class.getCanonicalName(),
+ new String[] {"--arg", "suffix"});
final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
- assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName())));
+ assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64feb4e7/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index 5f8857f..ada434d 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -18,6 +18,7 @@
package org.apache.flink.container.entrypoint;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,6 +36,7 @@ public class TestJob {
final SingleOutputStreamOperator<Integer> mapper = source.map(element -> 2 * element);
mapper.addSink(new DiscardingSink<>());
- env.execute(TestJob.class.getCanonicalName());
+ ParameterTool parameterTool = ParameterTool.fromArgs(args);
+ env.execute(TestJob.class.getCanonicalName() + "-" + parameterTool.getRequired("arg"));
}
}