You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/27 12:30:21 UTC

flink git commit: [FLINK-4913][yarn] include user jars in system class loader

Repository: flink
Updated Branches:
  refs/heads/release-1.1 17bb96dd6 -> ea41b9c56


[FLINK-4913][yarn] include user jars in system class loader

When deploying a Yarn cluster for a single job, this change
pre-configures the cluster to include the user jar(s) on all nodes.
This eliminates the need to upload jar files through the
BlobClient. More importantly, it loads the user classes only once and
not on every instantiation of a Task. This also reduces the JobManager
class loading upon recovery of a failed job.

Backported from 2b600d355f5df9364c634282469acd608d7a2104


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea41b9c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea41b9c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea41b9c5

Branch: refs/heads/release-1.1
Commit: ea41b9c56fdc0af3c97d6dd48d04218db6176ec8
Parents: 17bb96d
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 27 14:28:03 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Oct 27 14:29:56 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 14 ++++--
 .../flink/client/cli/CustomCommandLine.java     | 14 +++++-
 .../org/apache/flink/client/cli/DefaultCLI.java |  5 +-
 .../flink/client/program/ClusterClient.java     | 40 +++++++++++-----
 .../flink/client/program/PackagedProgram.java   | 46 +++++++++++++------
 .../client/program/StandaloneClusterClient.java |  6 +++
 .../org/apache/flink/api/scala/FlinkShell.scala |  7 ++-
 ...CliFrontendYarnAddressConfigurationTest.java |  5 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 48 +++++++++++++++++++-
 .../apache/flink/yarn/YarnClusterClient.java    |  6 +++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +++-
 12 files changed, 166 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c24dcc3..929d02e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -234,7 +234,7 @@ public class CliFrontend {
 		ClusterClient client = null;
 		try {
 
-			client = createClient(options, program.getMainClassName());
+			client = createClient(options, program);
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			client.setDetached(options.getDetachedMode());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
@@ -871,12 +871,12 @@ public class CliFrontend {
 	/**
 	 * Creates a {@link ClusterClient} object from the given command line options and other parameters.
 	 * @param options Command line options
-	 * @param programName Program name
+	 * @param program The program for which to create the client.
 	 * @throws Exception
 	 */
 	protected ClusterClient createClient(
 			CommandLineOptions options,
-			String programName) throws Exception {
+			PackagedProgram program) throws Exception {
 
 		// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
 		CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
@@ -887,8 +887,12 @@ public class CliFrontend {
 			logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
 		} catch (UnsupportedOperationException e) {
 			try {
-				String applicationName = "Flink Application: " + programName;
-				client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
+				String applicationName = "Flink Application: " + program.getMainClassName();
+				client = activeCommandLine.createCluster(
+					applicationName,
+					options.getCommandLine(),
+					config,
+					program.getAllLibraries());
 				logAndSysout("Cluster started: " + client.getClusterIdentifier());
 			} catch (UnsupportedOperationException e2) {
 				throw new IllegalConfigurationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index aecdc7c..c58c74c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -22,6 +22,9 @@ import org.apache.commons.cli.Options;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
+import java.net.URL;
+import java.util.List;
+
 
 /**
  * Custom command-line interface to load hooks for the command-line interface.
@@ -61,15 +64,22 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	 * @return Client if a cluster could be retrieved
 	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
+	ClusterType retrieveCluster(
+			CommandLine commandLine,
+			Configuration config) throws UnsupportedOperationException;
 
 	/**
 	 * Creates the client for the cluster
 	 * @param applicationName The application name to use
 	 * @param commandLine The command-line options parsed by the CliFrontend
 	 * @param config The Flink config to use
+	 * @param userJarFiles User jar files to include in the classpath of the cluster.
 	 * @return The client to communicate with the cluster which the CustomCommandLine brought up.
 	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
+	ClusterType createCluster(
+			String applicationName,
+			CommandLine commandLine,
+			Configuration config,
+			List<URL> userJarFiles) throws UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 5f83c3d..598c612 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
 
 import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
 
@@ -75,7 +77,8 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 	public StandaloneClusterClient createCluster(
 			String applicationName,
 			CommandLine commandLine,
-			Configuration config) throws UnsupportedOperationException {
+			Configuration config,
+			List<URL> userJarFiles) throws UnsupportedOperationException {
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
 		return descriptor.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index f5e513f..7092bfd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -306,11 +306,27 @@ public abstract class ClusterClient {
 	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return run(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
+
+			final JobWithJars jobWithJars;
+			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
+				jobWithJars = prog.getPlanWithoutJars();
+			} else {
+				jobWithJars = prog.getPlanWithJars();
+			}
+
+			return run(jobWithJars, parallelism, prog.getSavepointPath());
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+
+			final List<URL> libraries;
+			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
+				libraries = Collections.emptyList();
+			} else {
+				libraries = prog.getAllLibraries();
+			}
+
+			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
 					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
 					prog.getSavepointPath());
 			ContextEnvironment.setAsContext(factory);
@@ -348,7 +364,7 @@ public abstract class ClusterClient {
 	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
 	 * execution is complete, and returns afterwards.
 	 *
-	 * @param program The program to be executed.
+	 * @param jobWithJars The program to be executed.
 	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
 	 *                    when the program does not set a parallelism by itself.
 	 *
@@ -358,15 +374,15 @@ public abstract class ClusterClient {
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobSubmissionResult run(JobWithJars program, int parallelism, String savepointPath)
+	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, String savepointPath)
 			throws CompilerException, ProgramInvocationException {
-		ClassLoader classLoader = program.getUserCodeClassLoader();
+		ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
 		if (classLoader == null) {
 			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
 		}
 
-		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
-		return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
+		OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
+		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointPath);
 	}
 
 	public JobSubmissionResult run(
@@ -597,10 +613,6 @@ public abstract class ClusterClient {
 		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
 
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
-	}
-
 	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
 		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
 	}
@@ -727,6 +739,12 @@ public abstract class ClusterClient {
 	public abstract int getMaxSlots();
 
 	/**
+	 * Returns true if the client already has the user jar and providing it again would
+	 * result in duplicate uploading of the jar.
+	 */
+	public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
+
+	/**
 	 * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
 	 * some custom job submission logic.
 	 * @param jobGraph The JobGraph to be submitted

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index bff8e3e..42e63c4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -282,23 +282,38 @@ public class PackagedProgram {
 	}
 
 	/**
+	 * Returns the plan without the required jars when the files are already provided by the cluster.
+	 *
+	 * @return The plan without attached jar files.
+	 * @throws ProgramInvocationException
+	 */
+	public JobWithJars getPlanWithoutJars() throws ProgramInvocationException {
+		if (isUsingProgramEntryPoint()) {
+			return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
+		} else {
+			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
+				" for a program that is using the interactive mode.");
+		}
+	}
+
+	/**
 	 * Returns the plan with all required jars.
-	 * 
+	 *
 	 * @return The plan with attached jar files.
-	 * @throws ProgramInvocationException 
+	 * @throws ProgramInvocationException
 	 */
 	public JobWithJars getPlanWithJars() throws ProgramInvocationException {
 		if (isUsingProgramEntryPoint()) {
 			return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
 		} else {
-			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + 
+			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
 					" for a program that is using the interactive mode.");
 		}
 	}
 
 	/**
 	 * Returns the analyzed plan without any optimizations.
-	 * 
+	 *
 	 * @return
 	 *         the analyzed plan without any optimizations.
 	 * @throws ProgramInvocationException Thrown if an error occurred in the
@@ -308,7 +323,7 @@ public class PackagedProgram {
 	public String getPreviewPlan() throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
 		List<DataSinkNode> previewPlan;
-		
+
 		if (isUsingProgramEntryPoint()) {
 			previewPlan = Optimizer.createPreOptimizedPlan(getPlan());
 		}
@@ -335,7 +350,7 @@ public class PackagedProgram {
 			finally {
 				env.unsetAsContext();
 			}
-			
+
 			if (env.previewPlan != null) {
 				previewPlan =  env.previewPlan;
 			} else {
@@ -359,7 +374,7 @@ public class PackagedProgram {
 	/**
 	 * Returns the description provided by the Program class. This
 	 * may contain a description of the plan itself and its arguments.
-	 * 
+	 *
 	 * @return The description of the PactProgram's input parameters.
 	 * @throws ProgramInvocationException
 	 *         This invocation is thrown if the Program can't be properly loaded. Causes
@@ -367,7 +382,7 @@ public class PackagedProgram {
 	 */
 	public String getDescription() throws ProgramInvocationException {
 		if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
-			
+
 			ProgramDescription descr;
 			if (this.program != null) {
 				descr = (ProgramDescription) this.program;
@@ -379,22 +394,22 @@ public class PackagedProgram {
 					return null;
 				}
 			}
-			
+
 			try {
 				return descr.getDescription();
 			}
 			catch (Throwable t) {
-				throw new ProgramInvocationException("Error while getting the program description" + 
+				throw new ProgramInvocationException("Error while getting the program description" +
 						(t.getMessage() == null ? "." : ": " + t.getMessage()), t);
 			}
-			
+
 		} else {
 			return null;
 		}
 	}
-	
+
 	/**
-	 * 
+	 *
 	 * This method assumes that the context environment is prepared, or the execution
 	 * will be a local execution by default.
 	 */
@@ -417,13 +432,16 @@ public class PackagedProgram {
 
 	/**
 	 * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
-	 * 
+	 *
 	 * @return The user code ClassLoader.
 	 */
 	public ClassLoader getUserCodeClassLoader() {
 		return this.userCodeClassLoader;
 	}
 
+	/**
+	 * Returns all provided libraries needed to run the program.
+	 */
 	public List<URL> getAllLibraries() {
 		List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 3343b69..296ddc9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -28,6 +28,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -87,6 +88,11 @@ public class StandaloneClusterClient extends ClusterClient {
 	}
 
 	@Override
+	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+		return false;
+	}
+
+	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
 			throws ProgramInvocationException {
 		if (isDetached()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index f3b3507..b2ec626 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Collections
 
 import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
@@ -252,7 +253,11 @@ object FlinkShell {
     val config = frontend.getConfiguration
     val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
+    val cluster = customCLI.createCluster(
+      "Flink Scala Shell",
+      options.getCommandLine,
+      config,
+      Collections.emptyList())
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 8ba786f..77d3149 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -331,8 +332,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
 		@Override
 		// make method public
-		public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
-			return super.createClient(options, programName);
+		public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
+			return super.createClient(options, program);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 7e612c4..78e16ed 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -667,9 +668,9 @@ public abstract class YarnTestBase extends TestLogger {
 		public TestingCLI() throws Exception {}
 
 		@Override
-		protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
+		protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
 			// mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on
-			originalClusterClient = super.createClient(options, programName);
+			originalClusterClient = super.createClient(options, program);
 			spiedClusterClient = Mockito.spy(originalClusterClient);
 			Mockito.doNothing().when(spiedClusterClient).shutdown();
 			return spiedClusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 4df46a6..728eae3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -60,6 +60,8 @@ import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -129,6 +131,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private String zookeeperNamespace;
 
+	/** Optional Jar file to include in the system class loader of all application nodes
+	 * (for per-job submission) */
+	private Set<File> userJarFiles;
+
 	public AbstractYarnClusterDescriptor() {
 		// for unit tests only
 		if(System.getenv("IN_TESTS") != null) {
@@ -239,6 +245,41 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
 	}
 
+	/**
+	 * Returns true if the descriptor has the job jars to include in the classpath.
+	 */
+	public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
+		if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) {
+			return false;
+		}
+		try {
+			for(URL jarFile : requiredJarFiles) {
+				if (!userJarFiles.contains(new File(jarFile.toURI()))) {
+					return false;
+				}
+			}
+		} catch (URISyntaxException e) {
+			return false;
+		}
+		return true;
+	}
+
+	/**
+	 * Sets the user jar which is included in the system classloader of all nodes.
+	 */
+	public void setProvidedUserJarFiles(List<URL> userJarFiles) {
+		Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size());
+		for (URL jarFile : userJarFiles) {
+			try {
+				localUserJarFiles.add(new File(jarFile.toURI()));
+			} catch (URISyntaxException e) {
+				throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile
+					+ " Currently only file:/// URLs are supported.");
+			}
+		}
+		this.userJarFiles = localUserJarFiles;
+	}
+
 	public String getDynamicPropertiesEncoded() {
 		return this.dynamicPropertiesEncoded;
 	}
@@ -546,6 +587,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j);
 
+		// add the user jar to the classpath of the to-be-created cluster
+		if (userJarFiles != null) {
+			effectiveShipFiles.addAll(userJarFiles);
+		}
+
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 
@@ -700,7 +746,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			try {
 				report = yarnClient.getApplicationReport(appId);
 			} catch (IOException e) {
-				throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
+				throw new YarnDeploymentException("Failed to deploy the cluster.", e);
 			}
 			YarnApplicationState appState = report.getYarnApplicationState();
 			switch(appState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 8b6cd9a..e620f21 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -55,6 +55,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -194,6 +195,11 @@ public class YarnClusterClient extends ClusterClient {
 	}
 
 	@Override
+	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+		return clusterDescriptor.hasUserJarFiles(userJarFiles);
+	}
+
+	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
 		if (isDetached()) {
 			if (newlyCreatedCluster) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea41b9c5/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 28d8fb8..4823d35 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -48,6 +49,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -515,10 +517,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	@Override
-	public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) {
+	public YarnClusterClient createCluster(
+			String applicationName,
+			CommandLine cmdLine,
+			Configuration config,
+			List<URL> userJarFiles) {
+		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
 		yarnClusterDescriptor.setFlinkConfiguration(config);
+		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		try {
 			return yarnClusterDescriptor.deploy();