You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:23 UTC

[03/12] flink git commit: [FLINK-9488] Add container entry point StandaloneJobClusterEntryPoint

[FLINK-9488] Add container entry point StandaloneJobClusterEntryPoint

The StandaloneJobClusterEntryPoint is the basic entry point for containers. It is started with
the user code jar in its classpath and the classname of the user program. The entrypoint will
then load this user program via the classname and execute its main method. This will generate
a JobGraph which is then used to start the MiniDispatcher.

This closes #6315.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f467c1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f467c1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f467c1e

Branch: refs/heads/master
Commit: 8f467c1e9727d5a86d38d0b49753c534a1a161da
Parents: ab9bd87
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 9 23:54:55 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:01:26 2018 +0200

----------------------------------------------------------------------
 flink-container/pom.xml                         |  69 ++++++++
 .../StandaloneJobClusterConfiguration.java      |  43 +++++
 ...oneJobClusterConfigurationParserFactory.java |  75 +++++++++
 .../StandaloneJobClusterEntryPoint.java         | 156 +++++++++++++++++++
 .../src/main/resources/log4j.properties         |  27 ++++
 ...obClusterConfigurationParserFactoryTest.java |  84 ++++++++++
 .../StandaloneJobClusterEntryPointTest.java     |  53 +++++++
 .../flink/container/entrypoint/TestJob.java     |  40 +++++
 .../src/test/resources/log4j-test.properties    |  32 ++++
 .../ClusterConfigurationParserFactoryTest.java  |   1 -
 pom.xml                                         |   1 +
 11 files changed, 580 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/pom.xml
----------------------------------------------------------------------
diff --git a/flink-container/pom.xml b/flink-container/pom.xml
new file mode 100644
index 0000000..b20d321
--- /dev/null
+++ b/flink-container/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.6-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-container_${scala.binary.version}</artifactId>
+	<name>flink-container</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!--  set all Flink dependencies to provided, so they and their transitive  -->
+		<!-- dependencies do not get promoted to direct dependencies during shading -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
new file mode 100644
index 0000000..e68e74b
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+
+import javax.annotation.Nonnull;
+
+import java.util.Properties;
+
+/**
+ * Configuration for the {@link StandaloneJobClusterEntryPoint}.
+ */
+final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {
+	@Nonnull
+	private final String jobClassName;
+
+	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) {
+		super(configDir, dynamicProperties, args, restPort);
+		this.jobClassName = jobClassName;
+	}
+
+	@Nonnull
+	String getJobClassName() {
+		return jobClassName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
----------------------------------------------------------------------
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
new file mode 100644
index 0000000..c0cb473
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import javax.annotation.Nonnull;
+
+import java.util.Properties;
+
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
+
+/**
+ * Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given
+ * list of command line arguments.
+ */
+public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory<StandaloneJobClusterConfiguration> {
+
+	private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
+		.longOpt("job-classname")
+		.required(true)
+		.hasArg(true)
+		.argName("job class name")
+		.desc("Class name of the job to run.")
+		.build();
+
+	@Override
+	public Options getOptions() {
+		final Options options = new Options();
+		options.addOption(CONFIG_DIR_OPTION);
+		options.addOption(REST_PORT_OPTION);
+		options.addOption(JOB_CLASS_NAME_OPTION);
+		options.addOption(DYNAMIC_PROPERTY_OPTION);
+
+		return options;
+	}
+
+	@Override
+	public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
+		final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
+		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
+		final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
+		final int restPort = Integer.parseInt(restPortString);
+		final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
+
+		return new StandaloneJobClusterConfiguration(
+			configDir,
+			dynamicProperties,
+			commandLine.getArgs(),
+			restPort,
+			jobClassName);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
----------------------------------------------------------------------
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
new file mode 100644
index 0000000..47cca4c
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobClusterEntrypoint} which is started with a job in a predefined
+ * location.
+ */
+public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
+
+	private  static final String[] EMPTY_ARGS = new String[0];
+
+	@Nonnull
+	private final String jobClassName;
+
+	StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) {
+		super(configuration);
+		this.jobClassName = jobClassName;
+	}
+
+	@Override
+	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		final PackagedProgram packagedProgram = createPackagedProgram();
+		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		try {
+			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+			jobGraph.setAllowQueuedScheduling(true);
+
+			return jobGraph;
+		} catch (Exception e) {
+			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
+		}
+	}
+
+	private PackagedProgram createPackagedProgram() throws FlinkException {
+		try {
+			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
+			return new PackagedProgram(mainClass, EMPTY_ARGS);
+		} catch (ClassNotFoundException | ProgramInvocationException e) {
+			throw new FlinkException("Could not load the provied entrypoint class.", e);
+		}
+	}
+
+	@Override
+	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
+		terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
+	}
+
+	@Override
+	protected ResourceManager<?> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			ClusterInformation clusterInformation,
+			@Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			resourceManagerRuntimeServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new StandaloneResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			resourceManagerRuntimeServices.getSlotManager(),
+			metricRegistry,
+			resourceManagerRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler);
+	}
+
+	public static void main(String[] args) {
+		// startup checks and logging
+		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+		StandaloneJobClusterConfiguration clusterConfiguration = null;
+
+		try {
+			clusterConfiguration = commandLineParser.parse(args);
+		} catch (FlinkParseException e) {
+			LOG.error("Could not parse command line arguments {}.", args, e);
+			commandLineParser.printHelp();
+			System.exit(1);
+		}
+
+		Configuration configuration = loadConfiguration(clusterConfiguration);
+
+		configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
+
+		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName());
+
+		entrypoint.startCluster();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties
new file mode 100644
index 0000000..62cb6ed
--- /dev/null
+++ b/flink-container/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+log4j.logger.org.apache.flink.mesos=DEBUG
+log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
new file mode 100644
index 0000000..1f39a06
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}.
+ */
+public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {
+
+	private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+
+	@Test
+	public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
+		final String configDir = "/foo/bar";
+		final String key = "key";
+		final String value = "value";
+		final int restPort = 1234;
+		final String jobClassName = "foobar";
+		final String arg1 = "arg1";
+		final String arg2 = "arg2";
+		final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2};
+
+		final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
+
+		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
+		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+		assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
+		final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();
+
+		assertThat(dynamicProperties, hasEntry(key, value));
+
+		assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
+	}
+
+	@Test
+	public void testOnlyRequiredArguments() throws FlinkParseException {
+		final String configDir = "/foo/bar";
+		final String jobClassName = "foobar";
+		final String[] args = {"--configDir", configDir, "--job-classname", jobClassName};
+
+		final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
+
+		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
+		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+		assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
+	}
+
+	@Test(expected = FlinkParseException.class)
+	public void testMissingRequiredArgument() throws FlinkParseException {
+		final String[] args = {};
+
+		commandLineParser.parse(args);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
new file mode 100644
index 0000000..360799d
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ */
+public class StandaloneJobClusterEntryPointTest extends TestLogger {
+
+	@Test
+	public void testJobGraphRetrieval() throws FlinkException {
+		final Configuration configuration = new Configuration();
+		final int parallelism = 42;
+		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
+			configuration,
+			TestJob.class.getCanonicalName());
+
+		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+
+		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName())));
+		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
----------------------------------------------------------------------
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
new file mode 100644
index 0000000..5f8857f
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+/**
+ * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ */
+public class TestJob {
+
+	public static void main(String[] args) throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
+		final SingleOutputStreamOperator<Integer> mapper = source.map(element -> 2 * element);
+		mapper.addSink(new DiscardingSink<>());
+
+		env.execute(TestJob.class.getCanonicalName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-container/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-container/src/test/resources/log4j-test.properties b/flink-container/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b85f2f2
--- /dev/null
+++ b/flink-container/src/test/resources/log4j-test.properties
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+
+# log whats going on between the tests
+log4j.logger.org.apache.flink.runtime.leaderelection=OFF
+log4j.logger.org.apache.flink.runtime.leaderretrieval=OFF
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
index 7447439..62da39e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
@@ -73,5 +73,4 @@ public class ClusterConfigurationParserFactoryTest extends TestLogger {
 
 		commandLineParser.parse(args);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f467c1e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 897ae3c..1f35cd4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,7 @@ under the License.
 		<module>flink-formats</module>
 		<module>flink-examples</module>
 		<module>flink-clients</module>
+		<module>flink-container</module>
 		<module>flink-queryable-state</module>
 		<module>flink-tests</module>
 		<module>flink-end-to-end-tests</module>