You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/08 15:58:33 UTC

[1/2] flink git commit: [FLINK-1789][core][runtime] Allow adding of URLs to the usercode class loader

Repository: flink
Updated Branches:
  refs/heads/master fa2bb8f11 -> 0ee0c1f55


http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 461138c..131a2e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -74,6 +74,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -162,7 +163,7 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				
 				new Within(d) {
@@ -267,13 +268,13 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final ActorGateway tm = taskManager;
 
@@ -405,13 +406,13 @@ public class TaskManagerTest {
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d){
 
@@ -507,13 +508,14 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
+						Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d) {
 
@@ -651,13 +653,13 @@ public class TaskManagerTest {
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), 0);
+						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				new Within(d){
 
@@ -796,7 +798,8 @@ public class TaskManagerTest {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(), 0);
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(), 0);
 
 				new Within(d) {
 					@Override
@@ -888,7 +891,8 @@ public class TaskManagerTest {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(), 0);
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(), 0);
 
 				new Within(new FiniteDuration(120, TimeUnit.SECONDS)) {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index cf1bfc5..fb933f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
+import java.net.URL;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -739,6 +740,7 @@ public class TaskTest {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
 				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
 				0);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index cb37fd4..a336957 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,7 +19,6 @@ package org.apache.flink.api.java;
  * limitations under the License.
  */
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
@@ -27,8 +26,9 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.File;
+import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -50,7 +50,7 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	 * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
 	 */
 	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
-		super(host, port, jarFiles);
+		super(host, port, null, jarFiles, null);
 		this.flinkILoop = flinkILoop;
 	}
 
@@ -65,21 +65,22 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 
-		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
+		URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
 
 		// get "external jars, and add the shell command jar, pass to executor
-		List<String> alljars = new ArrayList<String>();
+		List<URL> alljars = new ArrayList<>();
 		// get external (library) jars
 		String[] extJars = this.flinkILoop.getExternalJars();
-		
-		if(!ArrayUtils.isEmpty(extJars)) {
-			alljars.addAll(Arrays.asList(extJars));
+
+		for (String extJar : extJars) {
+			URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL();
+			alljars.add(extJarUrl);
 		}
+
 		// add shell commands
-		alljars.add(jarFile);
-		String[] alljarsArr = new String[alljars.size()];
-		alljarsArr = alljars.toArray(alljarsArr);
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);
+		alljars.add(jarUrl);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
+				alljars.toArray(new URL[alljars.size()]), null);
 
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ccf51d2..02c938e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -19,7 +19,12 @@ package org.apache.flink.streaming.api.environment;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -40,13 +45,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
+	/** The hostname of the JobManager */
 	private final String host;
+
+	/** The port of the JobManager main actor system */
 	private final int port;
-	private final List<File> jarFiles;
-	
+
 	/** The configuration used to parametrize the client that connects to the remote cluster */
 	private final Configuration config;
 
+	/** The jar files that need to be attached to each job */
+	private final List<URL> jarFiles;
+	
+	/** The classpaths that need to be attached to each job */
+	private final List<URL> globalClasspaths;
+
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
 	 * (JobManager) described by the given host name and port.
@@ -87,6 +100,34 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            provided in the JAR files.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
+		this(host, port, config, jarFiles, null);
+	}
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 *
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param config
+	 *            The configuration used to parametrize the client that connects to the
+	 *            remote cluster.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @param globalClasspaths 
+	 *            The paths of directories and JAR files that are added to each user code 
+	 *            classloader on all nodes in the cluster. Note that the paths must specify a 
+	 *            protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
+	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
+	 */
+	public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, " +
@@ -103,16 +144,23 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		this.host = host;
 		this.port = port;
 		this.config = config == null ? new Configuration() : config;
-		this.jarFiles = new ArrayList<File>(jarFiles.length);
+		this.jarFiles = new ArrayList<>(jarFiles.length);
 		for (String jarFile : jarFiles) {
-			File file = new File(jarFile);
 			try {
-				JobWithJars.checkJarFile(file);
+				URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
+				this.jarFiles.add(jarFileUrl);
+				JobWithJars.checkJarFile(jarFileUrl);
+			} catch (MalformedURLException e) {
+				throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + jarFile, e);
 			}
-			catch (IOException e) {
-				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
-			}
-			this.jarFiles.add(file);
+		}
+		if (globalClasspaths == null) {
+			this.globalClasspaths = Collections.emptyList();
+		}
+		else {
+			this.globalClasspaths = Arrays.asList(globalClasspaths);
 		}
 	}
 
@@ -135,11 +183,16 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		for (File file : jarFiles) {
-			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		for (URL jarFile : jarFiles) {
+			try {
+				jobGraph.addJar(new Path(jarFile.toURI()));
+			} catch (URISyntaxException e) {
+				throw new ProgramInvocationException("URL is invalid", e);
+			}
 		}
 
-		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
+			getClass().getClassLoader());
 		
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 2b2a426..1392efb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import java.io.File;
+import java.net.URL;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
@@ -36,7 +36,9 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
 
-	private final List<File> jars;
+	private final List<URL> jars;
+
+	private final List<URL> classpaths;
 	
 	private final Client client;
 
@@ -44,12 +46,15 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	
 	private final boolean wait;
 
-	protected StreamContextEnvironment(Client client, List<File> jars, int parallelism, boolean wait) {
+	protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
+			boolean wait) {
 		this.client = client;
 		this.jars = jars;
+		this.classpaths = classpaths;
 		this.wait = wait;
 		
-		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
+				getClass().getClassLoader());
 		
 		if (parallelism > 0) {
 			setParallelism(parallelism);
@@ -84,10 +89,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 		transformations.clear();
 
 		// attach all necessary jar files to the JobGraph
-		for (File file : jars) {
-			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		for (URL file : jars) {
+			jobGraph.addJar(new Path(file.toURI()));
 		}
 
+		jobGraph.setClasspaths(classpaths);
+
 		// execute the programs
 		if (wait) {
 			return client.runBlocking(jobGraph, userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index cc96217..28410fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -69,9 +69,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1285,7 +1285,7 @@ public abstract class StreamExecutionEnvironment {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
 			ContextEnvironment ctx = (ContextEnvironment) env;
-			return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+			return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
 					ctx.getParallelism(), ctx.isWait());
 		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
 			return new StreamPlanEnvironment(env);
@@ -1295,9 +1295,9 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	private static StreamExecutionEnvironment createContextEnvironment(
-			Client client, List<File> jars, int parallelism, boolean wait)
+			Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean wait)
 	{
-		return new StreamContextEnvironment(client, jars, parallelism, wait);
+		return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 4480d95..dab6a6d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -68,12 +67,23 @@ public class ClassLoaderITCase {
 				PackagedProgram inputSplitTestProg = new PackagedProgram(
 						new File(INPUT_SPLITS_PROG_JAR_FILE),
 						new String[] { INPUT_SPLITS_PROG_JAR_FILE,
+										"", // classpath
 										"localhost",
 										String.valueOf(port),
 										"4" // parallelism
 									});
 				inputSplitTestProg.invokeInteractiveModeForExecution();
 
+				String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
+				PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
+						new String[] { "",
+										classpath, // classpath
+										"localhost",
+										String.valueOf(port),
+										"4" // parallelism
+									} );
+				inputSplitTestProg2.invokeInteractiveModeForExecution();
+
 				// regular streaming job
 				PackagedProgram streamingProg = new PackagedProgram(
 						new File(STREAMING_PROG_JAR_FILE),

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index 36b56e0..b18e8e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.classloading.jar;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,13 +42,13 @@ import org.apache.flink.core.io.InputSplitAssigner;
 public class CustomInputSplitProgram {
 	
 	public static void main(String[] args) throws Exception {
-		
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		final int parallelism = Integer.parseInt(args[3]);
-		
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		final String[] jarFile = (args[0].equals(""))? null : new String[] { args[0] };
+		final URL[] classpath = (args[1].equals(""))? null : new URL[] { new URL(args[1]) };
+		final String host = args[2];
+		final int port = Integer.parseInt(args[3]);
+		final int parallelism = Integer.parseInt(args[4]);
+
+		RemoteEnvironment env = new RemoteEnvironment(host, port, null, jarFile, classpath);
 		env.setParallelism(parallelism);
 		env.getConfig().disableSysoutLogging();
 
@@ -59,7 +61,7 @@ public class CustomInputSplitProgram {
 					return new Tuple2<Integer, Double>(value, value * 0.5);
 				}
 			})
-			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+			.output(new DiscardingOutputFormat<Tuple2<Integer, Double>>());
 
 		env.execute();
 	}


[2/2] flink git commit: [FLINK-1789][core][runtime] Allow adding of URLs to the usercode class loader

Posted by mx...@apache.org.
[FLINK-1789][core][runtime] Allow adding of URLs to the usercode class loader

This closes #593.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ee0c1f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ee0c1f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ee0c1f5

Branch: refs/heads/master
Commit: 0ee0c1f5573ad059dc6a8e4489094b7f78267306
Parents: fa2bb8f
Author: twalthr <tw...@apache.org>
Authored: Thu Apr 9 01:33:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Oct 8 15:55:09 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   8 +-
 .../org/apache/flink/client/RemoteExecutor.java |  66 ++++++----
 .../flink/client/cli/CliFrontendParser.java     |  11 ++
 .../apache/flink/client/cli/ProgramOptions.java |  23 ++++
 .../org/apache/flink/client/program/Client.java |  43 +++---
 .../client/program/ContextEnvironment.java      |  43 ++++--
 .../flink/client/program/JobWithJars.java       |  91 +++++++------
 .../flink/client/program/PackagedProgram.java   | 130 ++++++++++++++-----
 .../client/CliFrontendPackageProgramTest.java   |  52 ++++++--
 .../RemoteExecutorHostnameResolutionTest.java   |   4 +-
 .../apache/flink/client/program/ClientTest.java |   3 +-
 .../org/apache/flink/storm/api/FlinkClient.java |  17 ++-
 .../apache/flink/storm/api/FlinkSubmitter.java  |   9 +-
 .../apache/flink/api/common/PlanExecutor.java   |  20 ++-
 .../flink/api/java/ExecutionEnvironment.java    |   2 +-
 .../flink/api/java/RemoteEnvironment.java       |  61 +++++++--
 .../deployment/TaskDeploymentDescriptor.java    |  20 ++-
 .../librarycache/BlobLibraryCacheManager.java   |  27 ++--
 .../FallbackLibraryCacheManager.java            |   6 +-
 .../librarycache/LibraryCacheManager.java       |  13 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  17 ++-
 .../runtime/executiongraph/ExecutionVertex.java |   5 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  19 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |   7 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../TaskDeploymentDescriptorTest.java           |   5 +-
 .../BlobLibraryCacheManagerTest.java            |   8 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +
 .../runtime/taskmanager/TaskManagerTest.java    |  26 ++--
 .../flink/runtime/taskmanager/TaskTest.java     |   2 +
 .../api/java/ScalaShellRemoteEnvironment.java   |  25 ++--
 .../environment/RemoteStreamEnvironment.java    |  77 +++++++++--
 .../environment/StreamContextEnvironment.java   |  19 ++-
 .../environment/StreamExecutionEnvironment.java |   8 +-
 .../test/classloading/ClassLoaderITCase.java    |  12 +-
 .../jar/CustomInputSplitProgram.java            |  18 +--
 36 files changed, 652 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 527c0df..c638894 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -89,7 +90,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Implementation of a simple command line fronted for executing programs.
+ * Implementation of a simple command line frontend for executing programs.
  */
 public class CliFrontend {
 
@@ -702,6 +703,7 @@ public class CliFrontend {
 	{
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
+		List<URL> classpaths = options.getClasspaths();
 
 		if (jarFilePath == null) {
 			throw new IllegalArgumentException("The program JAR file was not specified.");
@@ -721,8 +723,8 @@ public class CliFrontend {
 		String entryPointClass = options.getEntryPointClassName();
 
 		return entryPointClass == null ?
-				new PackagedProgram(jarFile, programArgs) :
-				new PackagedProgram(jarFile, entryPointClass, programArgs);
+				new PackagedProgram(jarFile, classpaths, programArgs) :
+				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index e8e9ade..bc0d220 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.client;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -49,53 +50,66 @@ import org.apache.flink.configuration.Configuration;
  * remotely execute program parts.</p>
  */
 public class RemoteExecutor extends PlanExecutor {
-		
+
 	private final Object lock = new Object();
-	
-	private final List<String> jarFiles;
+
+	private final List<URL> jarFiles;
+
+	private final List<URL> globalClasspaths;
 
 	private final Configuration clientConfiguration;
 
 	private Client client;
-	
+
 	private int defaultParallelism = 1;
-	
-	
+
+
 	public RemoteExecutor(String hostname, int port) {
-		this(hostname, port, Collections.<String>emptyList(), new Configuration());
+		this(hostname, port, new Configuration(), Collections.<URL>emptyList(),
+				Collections.<URL>emptyList());
 	}
-	
-	public RemoteExecutor(String hostname, int port, String jarFile) {
-		this(hostname, port, Collections.singletonList(jarFile), new Configuration());
+
+	public RemoteExecutor(String hostname, int port, URL jarFile) {
+		this(hostname, port, new Configuration(), Collections.singletonList(jarFile),
+				Collections.<URL>emptyList());
 	}
-	
-	public RemoteExecutor(String hostport, String jarFile) {
-		this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
+
+	public RemoteExecutor(String hostport, URL jarFile) {
+		this(getInetFromHostport(hostport), new Configuration(), Collections.singletonList(jarFile),
+				Collections.<URL>emptyList());
 	}
-	
-	public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
-		this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
+
+	public RemoteExecutor(String hostname, int port, List<URL> jarFiles) {
+		this(new InetSocketAddress(hostname, port), new Configuration(), jarFiles,
+				Collections.<URL>emptyList());
 	}
 
 	public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
-		this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
+		this(hostname, port, clientConfiguration, Collections.<URL>emptyList(),
+				Collections.<URL>emptyList());
 	}
 
-	public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
-		this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
+	public RemoteExecutor(String hostname, int port, Configuration clientConfiguration, URL jarFile) {
+		this(hostname, port, clientConfiguration, Collections.singletonList(jarFile),
+				Collections.<URL>emptyList());
 	}
 
-	public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
-		this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
+	public RemoteExecutor(String hostport, Configuration clientConfiguration, URL jarFile) {
+		this(getInetFromHostport(hostport), clientConfiguration,
+				Collections.singletonList(jarFile), Collections.<URL>emptyList());
 	}
 
-	public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
-		this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
+	public RemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+			List<URL> jarFiles, List<URL> globalClasspaths) {
+		this(new InetSocketAddress(hostname, port), clientConfiguration, jarFiles, globalClasspaths);
 	}
 
-	public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
-		this.jarFiles = jarFiles;
+	public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
+			List<URL> jarFiles, List<URL> globalClasspaths) {
 		this.clientConfiguration = clientConfiguration;
+		this.jarFiles = jarFiles;
+		this.globalClasspaths = globalClasspaths;
+
 
 		clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
 		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
@@ -171,7 +185,7 @@ public class RemoteExecutor extends PlanExecutor {
 			throw new IllegalArgumentException("The plan may not be null.");
 		}
 
-		JobWithJars p = new JobWithJars(plan, this.jarFiles);
+		JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
 		return executePlanWithJars(p);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 52f1260..028aead 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -47,6 +47,12 @@ public class CliFrontendParser {
 			"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
 					"JAR file does not specify the class in its manifest.");
 
+	static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
+			"classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
+					"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " +
+					"times for specifying more than one URL. The protocol must be supported by the " +
+					"{@link java.net.URLClassLoader}.");
+
 	static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
 			"The parallelism with which to run the program. Optional flag to override the default value " +
 					"specified in the configuration.");
@@ -78,6 +84,9 @@ public class CliFrontendParser {
 		CLASS_OPTION.setRequired(false);
 		CLASS_OPTION.setArgName("classname");
 
+		CLASSPATH_OPTION.setRequired(false);
+		CLASSPATH_OPTION.setArgName("url");
+
 		ADDRESS_OPTION.setRequired(false);
 		ADDRESS_OPTION.setArgName("host:port");
 
@@ -110,6 +119,7 @@ public class CliFrontendParser {
 	public static Options getProgramSpecificOptions(Options options) {
 		options.addOption(JAR_OPTION);
 		options.addOption(CLASS_OPTION);
+		options.addOption(CLASSPATH_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
@@ -121,6 +131,7 @@ public class CliFrontendParser {
 
 	private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
 		options.addOption(CLASS_OPTION);
+		options.addOption(CLASSPATH_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		return options;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index c45da13..11382d2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -19,11 +19,16 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.Arrays;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 
@@ -36,6 +41,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final String entryPointClass;
 
+	private final List<URL> classpaths;
+
 	private final String[] programArgs;
 
 	private final int parallelism;
@@ -62,6 +69,18 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 		this.programArgs = args;
 
+		List<URL> classpaths = new ArrayList<URL>();
+		if (line.hasOption(CLASSPATH_OPTION.getOpt())) {
+			for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {
+				try {
+					classpaths.add(new URL(path));
+				} catch (MalformedURLException e) {
+					throw new CliArgsException("Bad syntax for classpath: " + path);
+				}
+			}
+		}
+		this.classpaths = classpaths;
+
 		this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
 				line.getOptionValue(CLASS_OPTION.getOpt()) : null;
 
@@ -96,6 +115,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return entryPointClass;
 	}
 
+	public List<URL> getClasspaths() {
+		return classpaths;
+	}
+
 	public String[] getProgramArgs() {
 		return programArgs;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 91ed665..dfb9c1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.client.program;
 
-import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -282,7 +283,8 @@ public class Client {
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
+				prog.getUserCodeClassLoader(), parallelism, true);
 
 			// invoke here
 			try {
@@ -308,7 +310,8 @@ public class Client {
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
+				prog.getUserCodeClassLoader(), parallelism, false);
 
 			// invoke here
 			try {
@@ -348,7 +351,7 @@ public class Client {
 		}
 
 		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runBlocking(optPlan, program.getJarFiles(), classLoader);
+		return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
 	}
 
 	/**
@@ -373,21 +376,21 @@ public class Client {
 		}
 
 		OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
+		return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
 	}
 	
 
-	public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
-			throws ProgramInvocationException
+	public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
+			ClassLoader classLoader) throws ProgramInvocationException
 	{
-		JobGraph job = getJobGraph(compiledPlan, libraries);
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
 		return runBlocking(job, classLoader);
 	}
 
-	public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
-			throws ProgramInvocationException
+	public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
+			ClassLoader classLoader) throws ProgramInvocationException
 	{
-		JobGraph job = getJobGraph(compiledPlan, libraries);
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
 		return runDetached(job, classLoader);
 	}
 
@@ -538,16 +541,16 @@ public class Client {
 	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
 	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
 	 */
-	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
-																					ProgramInvocationException {
+	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
+			throws CompilerException, ProgramInvocationException {
 		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
 
 	public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries());
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths());
 	}
 
-	private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+	private static JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths) {
 		JobGraph job;
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
@@ -556,9 +559,15 @@ public class Client {
 			job = gen.compileJobGraph((OptimizedPlan) optPlan);
 		}
 
-		for (File jar : jarFiles) {
-			job.addJar(new Path(jar.getAbsolutePath()));
+		for (URL jar : jarFiles) {
+			try {
+				job.addJar(new Path(jar.toURI()));
+			} catch (URISyntaxException e) {
+				throw new RuntimeException("URL is invalid. This should not happen.", e);
+			}
 		}
+ 
+		job.setClasspaths(classpaths);
 
 		return job;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index e33a05d..d5a28fc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.client.program;
 
-import java.io.File;
+import java.net.URL;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
@@ -41,16 +41,21 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 	private final Client client;
 
-	private final List<File> jarFilesToAttach;
+	private final List<URL> jarFilesToAttach;
 
+	private final List<URL> classpathsToAttach;
+	
 	private final ClassLoader userCodeClassLoader;
 
 	private final boolean wait;
-
-
-	public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
+	
+	
+	
+	public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
+			ClassLoader userCodeClassLoader, boolean wait) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
+		this.classpathsToAttach = classpaths;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.wait = wait;
 	}
@@ -58,7 +63,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
-		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
+		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
+				this.userCodeClassLoader);
 
 		if (wait) {
 			this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
@@ -101,17 +107,21 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return this.client;
 	}
 	
-	public List<File> getJars(){
+	public List<URL> getJars(){
 		return jarFilesToAttach;
 	}
+
+	public List<URL> getClasspaths(){
+		return classpathsToAttach;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	static void setAsContext(Client client, List<File> jarFilesToAttach, 
+	static void setAsContext(Client client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach,
 				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
 	{
-		ContextEnvironmentFactory factory =
-				new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+		ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach,
+				classpathsToAttach, userCodeClassLoader, defaultParallelism, wait);
 		initializeContextEnvironment(factory);
 	}
 	
@@ -130,7 +140,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		
 		private final Client client;
 		
-		private final List<File> jarFilesToAttach;
+		private final List<URL> jarFilesToAttach;
+
+		private final List<URL> classpathsToAttach;
 		
 		private final ClassLoader userCodeClassLoader;
 		
@@ -139,11 +151,13 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		private final boolean wait;
 		
 
-		public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach, 
-				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
+		public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
+				List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
+				boolean wait)
 		{
 			this.client = client;
 			this.jarFilesToAttach = jarFilesToAttach;
+			this.classpathsToAttach = classpathsToAttach;
 			this.userCodeClassLoader = userCodeClassLoader;
 			this.defaultParallelism = defaultParallelism;
 			this.wait = wait;
@@ -151,7 +165,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		
 		@Override
 		public ExecutionEnvironment createExecutionEnvironment() {
-			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader, wait);
+			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach,
+					userCodeClassLoader, wait);
 			if (defaultParallelism > 0) {
 				env.setParallelism(defaultParallelism);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index 9e84e2d..ef02527 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -20,7 +20,7 @@ package org.apache.flink.client.program;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
@@ -37,33 +37,42 @@ public class JobWithJars {
 	
 	private Plan plan;
 	
-	private List<File> jarFiles;
-	
+	private List<URL> jarFiles;
+
+	/**
+	 * classpaths that are needed during user code execution
+	 */
+	private List<URL> classpaths;
+
 	private ClassLoader userCodeClassLoader;
 
-	
-	public JobWithJars(Plan plan, List<String> jarFiles) throws IOException {
+	public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths) throws IOException {
 		this.plan = plan;
-		this.jarFiles = new ArrayList<File>(jarFiles.size());
-		
-		for (String jar: jarFiles) {
-			File file = new File(jar);
-			checkJarFile(file);
-			this.jarFiles.add(file);
+		this.jarFiles = new ArrayList<URL>(jarFiles.size());
+		this.classpaths = new ArrayList<URL>(classpaths.size());
+
+		for (URL jarFile: jarFiles) {
+			checkJarFile(jarFile);
+			this.jarFiles.add(jarFile);
+		}
+
+		for (URL path: classpaths) {
+			this.classpaths.add(path);
 		}
 	}
-	
-	public JobWithJars(Plan plan, String jarFile) throws IOException {
+
+	public JobWithJars(Plan plan, URL jarFile) throws IOException {
 		this.plan = plan;
-		
-		File file = new File(jarFile);
-		checkJarFile(file);
-		this.jarFiles = Collections.singletonList(file);
+
+		checkJarFile(jarFile);
+		this.jarFiles = Collections.singletonList(jarFile);
+		this.classpaths = Collections.<URL>emptyList();
 	}
 	
-	JobWithJars(Plan plan, List<File> jarFiles, ClassLoader userCodeClassLoader) {
+	JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
 		this.plan = plan;
 		this.jarFiles = jarFiles;
+		this.classpaths = classpaths;
 		this.userCodeClassLoader = userCodeClassLoader;
 	}
 
@@ -77,48 +86,54 @@ public class JobWithJars {
 	/**
 	 * Returns list of jar files that need to be submitted with the plan.
 	 */
-	public List<File> getJarFiles() {
+	public List<URL> getJarFiles() {
 		return this.jarFiles;
 	}
 	
 	/**
+	 * Returns list of classpaths that need to be submitted with the plan.
+	 */
+	public List<URL> getClasspaths() {
+		return classpaths;
+	}
+	
+	/**
 	 * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
 	 * 
 	 * @return The user code ClassLoader.
 	 */
 	public ClassLoader getUserCodeClassLoader() {
 		if (this.userCodeClassLoader == null) {
-			this.userCodeClassLoader = buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+			this.userCodeClassLoader = buildUserCodeClassLoader(jarFiles, classpaths, getClass().getClassLoader());
 		}
-		
 		return this.userCodeClassLoader;
 	}
 	
 
-	public static void checkJarFile(File jar) throws IOException {
-		if (!jar.exists()) {
-			throw new IOException("JAR file does not exist '" + jar.getAbsolutePath() + "'");
+	public static void checkJarFile(URL jar) throws IOException {
+		File jarFile;
+		try {
+			jarFile = new File(jar.toURI());
+		} catch (URISyntaxException e) {
+			throw new IOException("JAR file path is invalid '" + jar + "'");
 		}
-		if (!jar.canRead()) {
-			throw new IOException("JAR file can't be read '" + jar.getAbsolutePath() + "'");
+		if (!jarFile.exists()) {
+			throw new IOException("JAR file does not exist '" + jarFile.getAbsolutePath() + "'");
+		}
+		if (!jarFile.canRead()) {
+			throw new IOException("JAR file can't be read '" + jarFile.getAbsolutePath() + "'");
 		}
 		// TODO: Check if proper JAR file
 	}
 	
-	public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
-		
-		URL[] urls = new URL[jars.size()];
-		try {
-			// add the nested jars
-			for (int i = 0; i < jars.size(); i++) {
-				urls[i] = jars.get(i).getAbsoluteFile().toURI().toURL();
-			}
+	public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent) {
+		URL[] urls = new URL[jars.size() + classpaths.size()];
+		for (int i = 0; i < jars.size(); i++) {
+			urls[i] = jars.get(i);
 		}
-		catch (MalformedURLException e) {
-			// this should not happen, as all files should have been checked before for proper paths and existence.
-			throw new RuntimeException("Cannot create class loader for program jar files: " + e.getMessage(), e);
+		for (int i = 0; i < classpaths.size(); i++) {
+			urls[i + jars.size()] = classpaths.get(i);
 		}
-		
 		return new URLClassLoader(urls, parent);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 0cd4d07..aeac296 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -30,6 +30,9 @@ import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -67,7 +70,7 @@ public class PackagedProgram {
 
 	// --------------------------------------------------------------------------------------------
 
-	private final File jarFile;
+	private final URL jarFile;
 
 	private final String[] args;
 	
@@ -76,6 +79,8 @@ public class PackagedProgram {
 	private final Class<?> mainClass;
 	
 	private final List<File> extractedTempLibraries;
+
+	private final List<URL> classpaths;
 	
 	private ClassLoader userCodeClassLoader;
 	
@@ -84,7 +89,7 @@ public class PackagedProgram {
 	/**
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * argument.
-	 * 
+	 *
 	 * @param jarFile
 	 *        The jar file which contains the plan and a Manifest which defines
 	 *        the program-class
@@ -96,7 +101,27 @@ public class PackagedProgram {
 	 *         may be a missing / wrong class or manifest files.
 	 */
 	public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException {
-		this(jarFile, null, args);
+		this(jarFile, Collections.<URL>emptyList(), null, args);
+	}
+
+	/**
+	 * Creates an instance that wraps the plan defined in the jar file using the given
+	 * argument.
+	 * 
+	 * @param jarFile
+	 *        The jar file which contains the plan and a Manifest which defines
+	 *        the program-class
+	 * @param classpaths
+	 *        Additional classpath URLs needed by the Program.
+	 * @param args
+	 *        Optional. The arguments used to create the pact plan, depend on
+	 *        implementation of the pact plan. See getDescription().
+	 * @throws ProgramInvocationException
+	 *         This invocation is thrown if the Program can't be properly loaded. Causes
+	 *         may be a missing / wrong class or manifest files.
+	 */
+	public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException {
+		this(jarFile, classpaths, null, args);
 	}
 
 	/**
@@ -117,23 +142,54 @@ public class PackagedProgram {
 	 *         may be a missing / wrong class or manifest files.
 	 */
 	public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException {
+		this(jarFile, Collections.<URL>emptyList(), entryPointClassName, args);
+	}
+
+	/**
+	 * Creates an instance that wraps the plan defined in the jar file using the given
+	 * arguments. For generating the plan the class defined in the className parameter
+	 * is used.
+	 * 
+	 * @param jarFile
+	 *        The jar file which contains the plan.
+	 * @param classpaths
+	 *        Additional classpath URLs needed by the Program.
+	 * @param entryPointClassName
+	 *        Name of the class which generates the plan. Overrides the class defined
+	 *        in the jar file manifest
+	 * @param args
+	 *        Optional. The arguments used to create the pact plan, depend on
+	 *        implementation of the pact plan. See getDescription().
+	 * @throws ProgramInvocationException
+	 *         This invocation is thrown if the Program can't be properly loaded. Causes
+	 *         may be a missing / wrong class or manifest files.
+	 */
+	public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException {
 		if (jarFile == null) {
 			throw new IllegalArgumentException("The jar file must not be null.");
 		}
 		
-		checkJarFile(jarFile);
+		URL jarFileUrl;
+		try {
+			jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
+		} catch (MalformedURLException e1) {
+			throw new IllegalArgumentException("The jar file path is invalid.");
+		}
+		
+		checkJarFile(jarFileUrl);
 		
-		this.jarFile = jarFile;
+		this.jarFile = jarFileUrl;
 		this.args = args == null ? new String[0] : args;
 		
 		// if no entryPointClassName name was given, we try and look one up through the manifest
 		if (entryPointClassName == null) {
-			entryPointClassName = getEntryPointClassNameFromJar(jarFile);
+			entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
 		}
 		
 		// now that we have an entry point, we can extract the nested jar files (if any)
-		this.extractedTempLibraries = extractContainedLibaries(jarFile);
-		this.userCodeClassLoader = buildUserCodeClassLoader(jarFile, extractedTempLibraries, getClass().getClassLoader());
+		this.extractedTempLibraries = extractContainedLibaries(jarFileUrl);
+		this.classpaths = classpaths;
+		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
 		
 		// load the entry point class
 		this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
@@ -168,6 +224,7 @@ public class PackagedProgram {
 		this.args = args == null ? new String[0] : args;
 		
 		this.extractedTempLibraries = Collections.emptyList();
+		this.classpaths = Collections.emptyList();
 		this.userCodeClassLoader = entryPointClass.getClassLoader();
 		
 		// load the entry point class
@@ -224,14 +281,7 @@ public class PackagedProgram {
 	 */
 	public JobWithJars getPlanWithJars() throws ProgramInvocationException {
 		if (isUsingProgramEntryPoint()) {
-			List<File> allJars = new ArrayList<File>();
-			
-			if (this.jarFile != null) {
-				allJars.add(jarFile);
-			}
-			allJars.addAll(extractedTempLibraries);
-			
-			return new JobWithJars(getPlan(), allJars, userCodeClassLoader);
+			return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
 		} else {
 			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + 
 					" for a program that is using the interactive mode.");
@@ -347,7 +397,16 @@ public class PackagedProgram {
 			throw new ProgramInvocationException("Cannot invoke a plan-based program directly.");
 		}
 	}
-	
+
+	/**
+	 * Returns the classpaths that are required by the program.
+	 *
+	 * @return List of {@link java.net.URL}s.
+	 */
+	public List<URL> getClasspaths() {
+		return this.classpaths;
+	}
+
 	/**
 	 * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
 	 * 
@@ -357,15 +416,24 @@ public class PackagedProgram {
 		return this.userCodeClassLoader;
 	}
 
-	public List<File> getAllLibraries() {
-		List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() + 1);
+	public List<URL> getAllLibraries() {
+		List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
+
 		if (jarFile != null) {
 			libs.add(jarFile);
 		}
-		libs.addAll(this.extractedTempLibraries);
+		for (File tmpLib : this.extractedTempLibraries) {
+			try {
+				libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
+			}
+			catch (MalformedURLException e) {
+				throw new RuntimeException("URL is invalid. This should not happen.", e);
+			}
+		}
+
 		return libs;
 	}
-	
+
 	/**
 	 * Deletes all temporary files created for contained packaged libraries.
 	 */
@@ -449,14 +517,16 @@ public class PackagedProgram {
 		}
 	}
 
-	private static String getEntryPointClassNameFromJar(File jarFile) throws ProgramInvocationException {
+	private static String getEntryPointClassNameFromJar(URL jarFile) throws ProgramInvocationException {
 		JarFile jar;
 		Manifest manifest;
 		String className;
 
 		// Open jar file
 		try {
-			jar = new JarFile(jarFile);
+			jar = new JarFile(new File(jarFile.toURI()));
+		} catch (URISyntaxException use) {
+			throw new ProgramInvocationException("Invalid file path '" + jarFile.getPath() + "'", use);
 		} catch (IOException ioex) {
 			throw new ProgramInvocationException("Error while opening jar file '" + jarFile.getPath() + "'. "
 				+ ioex.getMessage(), ioex);
@@ -553,13 +623,13 @@ public class PackagedProgram {
 	 * @return The file names of the extracted temporary files.
 	 * @throws ProgramInvocationException Thrown, if the extraction process failed.
 	 */
-	private static List<File> extractContainedLibaries(File jarFile) throws ProgramInvocationException {
+	private static List<File> extractContainedLibaries(URL jarFile) throws ProgramInvocationException {
 		
 		Random rnd = new Random();
 		
 		JarFile jar = null;
 		try {
-			jar = new JarFile(jarFile);
+			jar = new JarFile(new File(jarFile.toURI()));
 			final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>();
 			
 			Enumeration<JarEntry> entries = jar.entries();
@@ -658,15 +728,7 @@ public class PackagedProgram {
 		}
 	}
 	
-	private static ClassLoader buildUserCodeClassLoader(File mainJar, List<File> nestedJars, ClassLoader parent) throws ProgramInvocationException {
-		ArrayList<File> allJars = new ArrayList<File>(nestedJars.size() + 1);
-		allJars.add(mainJar);
-		allJars.addAll(nestedJars);
-		
-		return JobWithJars.buildUserCodeClassLoader(allJars, parent);
-	}
-	
-	private static void checkJarFile(File jarfile) throws ProgramInvocationException {
+	private static void checkJarFile(URL jarfile) throws ProgramInvocationException {
 		try {
 			JobWithJars.checkJarFile(jarfile);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 1718ba5..5439742 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.Client;
@@ -46,6 +45,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
+import java.net.URL;
 
 
 public class CliFrontendPackageProgramTest {
@@ -102,11 +102,17 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testVariantWithExplicitJarAndArgumentsOption() {
 		try {
-			String[] arguments = {"-j", getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"};
+			String[] arguments = {
+					"--classpath", "file:///tmp/foo",
+					"--classpath", "file:///tmp/bar",
+					"-j", getTestJarPath(),
+					"-a", "--debug", "true", "arg1", "arg2" };
+			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 			String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
 
 			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
 			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -124,11 +130,17 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testVariantWithExplicitJarAndNoArgumentsOption() {
 		try {
-			String[] arguments = {"-j", getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+			String[] arguments = {
+					"--classpath", "file:///tmp/foo",
+					"--classpath", "file:///tmp/bar",
+					"-j", getTestJarPath(),
+					"--debug", "true", "arg1", "arg2" };
+			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 			String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
 
 			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
 			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -147,11 +159,17 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testValidVariantWithNoJarAndNoArgumentsOption() {
 		try {
-			String[] arguments = {getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+			String[] arguments = {
+					"--classpath", "file:///tmp/foo",
+					"--classpath", "file:///tmp/bar",
+					getTestJarPath(),
+					"--debug", "true", "arg1", "arg2" };
+			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 			String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
 
 			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
 			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -182,11 +200,17 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testNonExistingFileWithArguments() {
 		try {
-			String[] arguments = {"/some/none/existing/path", "--debug", "true", "arg1", "arg2"};
+			String[] arguments = {
+					"--classpath", "file:///tmp/foo",
+					"--classpath", "file:///tmp/bar",
+					"/some/none/existing/path",
+					"--debug", "true", "arg1", "arg2"  };
+			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 			String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
 
 			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-			assertEquals(arguments[0], options.getJarFilePath());
+			assertEquals(arguments[4], options.getJarFilePath());
+			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -267,15 +291,19 @@ public class CliFrontendPackageProgramTest {
 		final boolean[] callme = { false }; // create a final object reference, to be able to change its val later
 
 		try {
-			String[] arguments = {"-c", TEST_JAR_CLASSLOADERTEST_CLASS, getTestJarPath(),
+			String[] arguments = {
+					"--classpath", "file:///tmp/foo",
+					"--classpath", "file:///tmp/bar",
+					"-c", TEST_JAR_CLASSLOADERTEST_CLASS, getTestJarPath(),
 					"true", "arg1", "arg2" };
+			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
+			String[] reducedArguments = { "true", "arg1", "arg2" };
 
-			String[] progArgs = { "true", "arg1", "arg2" };
-
-			InfoOptions options = CliFrontendParser.parseInfoCommand(arguments);
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
 			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
-			assertArrayEquals(progArgs, options.getProgramArgs());
+			assertArrayEquals(reducedArguments, options.getProgramArgs());
 			
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			PackagedProgram prog = spy(frontend.buildProgram(options));
@@ -294,7 +322,7 @@ public class CliFrontendPackageProgramTest {
 			when(prog.getUserCodeClassLoader()).thenReturn(testClassLoader);
 
 			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
-			assertArrayEquals(progArgs, prog.getArguments());
+			assertArrayEquals(reducedArguments, prog.getArguments());
 
 			Configuration c = new Configuration();
 			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index be3fe89..424f72e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
@@ -68,7 +69,8 @@ public class RemoteExecutorHostnameResolutionTest {
 		
 		try {
 			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
-			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
+			RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
+				Collections.<URL>emptyList(), Collections.<URL>emptyList());
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index a620139..1fbf681 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
 
@@ -82,7 +83,7 @@ public class ClientTest {
 		env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
 		
 		Plan plan = env.createProgramPlan();
-		JobWithJars jobWithJars = new JobWithJars(plan, Collections.<String>emptyList());
+		JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(),  Collections.<URL>emptyList());
 
 		program = mock(PackagedProgram.class);
 		when(program.getPlanWithJars()).thenReturn(jobWithJars);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index b19cb38..2be7599 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -57,6 +57,9 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -167,11 +170,14 @@ public class FlinkClient {
 			throw new AlreadyAliveException();
 		}
 
-		final File uploadedJarFile = new File(uploadedJarLocation);
+		final URI uploadedJarUri;
+		final URL uploadedJarUrl;
 		try {
-			JobWithJars.checkJarFile(uploadedJarFile);
+			uploadedJarUri = new File(uploadedJarLocation).getAbsoluteFile().toURI();
+			uploadedJarUrl = uploadedJarUri.toURL();
+			JobWithJars.checkJarFile(uploadedJarUrl);
 		} catch (final IOException e) {
-			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+			throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
 		}
 
 		/* set storm configuration */
@@ -180,7 +186,7 @@ public class FlinkClient {
 		}
 
 		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
-		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+		jobGraph.addJar(new Path(uploadedJarUri));
 
 		final Configuration configuration = jobGraph.getJobConfiguration();
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
@@ -195,7 +201,8 @@ public class FlinkClient {
 
 		try {
 			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
-					Lists.newArrayList(uploadedJarFile),
+					Lists.newArrayList(uploadedJarUrl),
+					Collections.<URL>emptyList(),
 					this.getClass().getClassLoader());
 			client.runDetached(jobGraph, classLoader);
 		} catch (final ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 9b03c68..13a39ef 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -23,6 +23,8 @@ import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.utils.Utils;
 
+import java.net.URISyntaxException;
+import java.net.URL;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
@@ -99,6 +101,7 @@ public class FlinkSubmitter {
 		final String serConf = JSONValue.toJSONString(stormConf);
 
 		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+
 		try {
 			if (client.getTopologyJobId(name) != null) {
 				throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
@@ -106,11 +109,13 @@ public class FlinkSubmitter {
 			String localJar = System.getProperty("storm.jar");
 			if (localJar == null) {
 				try {
-					for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+					for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
 							.getJars()) {
 						// TODO verify that there is only one jar
-						localJar = file.getAbsolutePath();
+						localJar = new File(url.toURI()).getAbsolutePath();
 					}
+				} catch (final URISyntaxException e) {
+					// ignore
 				} catch (final ClassCastException e) {
 					// ignore
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 514692f..cbdf137 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.configuration.Configuration;
 
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -169,9 +170,12 @@ public abstract class PlanExecutor {
 	 * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
 	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
 	 *                 from within the UDFs.
+	 * @param globalClasspaths A list of URLs that are added to the classpath of each user code classloader of the
+	 *                 program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
 	 * @return A remote executor.
 	 */
-	public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
+	public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+			URL[] jarFiles, URL[] globalClasspaths) {
 		if (hostname == null) {
 			throw new IllegalArgumentException("The hostname must not be null.");
 		}
@@ -181,13 +185,17 @@ public abstract class PlanExecutor {
 		
 		Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
 		
-		List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList()
-																		: Arrays.asList(jarFiles); 
-		
+		List<URL> files = (jarFiles == null || jarFiles.length == 0) ?
+				Collections.<URL>emptyList() : Arrays.asList(jarFiles);
+		List<URL> paths = (globalClasspaths == null || globalClasspaths.length == 0) ?
+				Collections.<URL>emptyList() : Arrays.asList(globalClasspaths);
+
 		try {
 			PlanExecutor executor = (clientConfiguration == null) ?
-					reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
-					reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
+					reClass.getConstructor(String.class, int.class, List.class)
+						.newInstance(hostname, port, files) :
+					reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
+						.newInstance(hostname, port, clientConfiguration, files, paths);
 			return executor;
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 01fb15c..aa7c0c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1152,7 +1152,7 @@ public abstract class ExecutionEnvironment {
 	 */
 	public static ExecutionEnvironment createRemoteEnvironment(
 			String host, int port, Configuration clientConfiguration, String... jarFiles) {
-		return new RemoteEnvironment(host, port, clientConfiguration, jarFiles);
+		return new RemoteEnvironment(host, port, clientConfiguration, jarFiles, null);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 5a4aa1e..4b61426 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+
 /**
  * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
  * needs to be created with the address and port of the JobManager of the Flink cluster that
@@ -44,7 +48,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	protected final int port;
 
 	/** The jar files that need to be attached to each job */
-	private final String[] jarFiles;
+	private final URL[] jarFiles;
 
 	/** The configuration used by the client that connects to the cluster */
 	private Configuration clientConfiguration;
@@ -55,20 +59,40 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
 	private Thread shutdownHook;
 
+	/** The classpaths that need to be attached to each job */
+	private final URL[] globalClasspaths;
+
 	/**
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
 	 * given host name and port.
-	 * 
+	 *
 	 * <p>Each program execution will have all the given JAR files in its classpath.
-	 * 
+	 *
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
-	 * @param port The port of the master (JobManager), where the program should be executed. 
+	 * @param port The port of the master (JobManager), where the program should be executed.
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
 	 *                 provided in the JAR files.
-	 */	
+	 */
 	public RemoteEnvironment(String host, int port, String... jarFiles) {
-		this(host, port, null, jarFiles);
+		this(host, port, null, jarFiles, null);
+	}
+
+	/**
+	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
+	 * given host name and port.
+	 *
+	 * <p>Each program execution will have all the given JAR files in its classpath.
+	 *
+	 * @param host The host name or address of the master (JobManager), where the program should be executed.
+	 * @param port The port of the master (JobManager), where the program should be executed.
+	 * @param clientConfig The configuration used by the client that connects to the cluster.
+	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
+	 *                 provided in the JAR files.
+	 */
+	public RemoteEnvironment(String host, int port, Configuration clientConfig, String[] jarFiles) {
+		this(host, port, clientConfig, jarFiles, null);
 	}
 
 	/**
@@ -83,8 +107,13 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
 	 *                 provided in the JAR files.
+	 * @param globalClasspaths The paths of directories and JAR files that are added to each user code 
+	 *                 classloader on all nodes in the cluster. Note that the paths must specify a 
+	 *                 protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
+	 *                 The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
-	public RemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles) {
+	public RemoteEnvironment(String host, int port, Configuration clientConfig,
+			String[] jarFiles, URL[] globalClasspaths) {
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
@@ -99,8 +128,21 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 
 		this.host = host;
 		this.port = port;
-		this.jarFiles = jarFiles;
 		this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
+		if (jarFiles != null) {
+			this.jarFiles = new URL[jarFiles.length];
+			for (int i = 0; i < jarFiles.length; i++) {
+				try {
+					this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+				} catch (MalformedURLException e) {
+					throw new IllegalArgumentException("JAR file path invalid", e);
+				}
+			}
+		}
+		else {
+			this.jarFiles = null;
+		}
+		this.globalClasspaths = globalClasspaths;
 	}
 
 	// ------------------------------------------------------------------------
@@ -146,7 +188,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	
 	private void ensureExecutorCreated() throws Exception {
 		if (executor == null) {
-			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
+			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
+				jarFiles, globalClasspaths);
 			executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index c4065d2..558fcd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
+import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 
@@ -77,6 +78,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
+	
+	/** The list of classpaths required to run this task. */
+	private final List<URL> requiredClasspaths;
 
 	private final SerializedValue<StateHandle<?>> operatorState;
 	
@@ -89,13 +93,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber,
-			SerializedValue<StateHandle<?>> operatorState) {
+			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
+			int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
 		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
 		checkArgument(targetSlotNumber >= 0);
-		
+
 		this.jobID = checkNotNull(jobID);
 		this.vertexID = checkNotNull(vertexID);
 		this.executionId = checkNotNull(executionId);
@@ -108,6 +112,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.producedPartitions = checkNotNull(producedPartitions);
 		this.inputGates = checkNotNull(inputGates);
 		this.requiredJarFiles = checkNotNull(requiredJarFiles);
+		this.requiredClasspaths = checkNotNull(requiredClasspaths);
 		this.targetSlotNumber = targetSlotNumber;
 		this.operatorState = operatorState;
 	}
@@ -118,11 +123,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber) {
+			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
+			int targetSlotNumber) {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
-				inputGates, requiredJarFiles, targetSlotNumber, null);
+				inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null);
 	}
 
 	/**
@@ -208,6 +214,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return requiredJarFiles;
 	}
 
+	public List<URL> getRequiredClasspaths() {
+		return requiredClasspaths;
+	}
+
 	@Override
 	public String toString() {
 		return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 88be5e1..97ec93a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -81,26 +81,31 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException {
-		registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles);
+	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
+			throws IOException {
+		registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, requiredClasspaths);
 	}
 	
 	@Override
-	public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles) throws IOException {
+	public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles,
+			Collection<URL> requiredClasspaths) throws IOException {
 		Preconditions.checkNotNull(jobId, "The JobId must not be null.");
 		Preconditions.checkNotNull(task, "The task execution id must not be null.");
-		
+
 		if (requiredJarFiles == null) {
 			requiredJarFiles = Collections.emptySet();
 		}
-		
+		if (requiredClasspaths == null) {
+			requiredClasspaths = Collections.emptySet();
+		}
+
 		synchronized (lockObject) {
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
-			
+
 			if (entry == null) {
 				// create a new entry in the library cache
 				BlobKey[] keys = requiredJarFiles.toArray(new BlobKey[requiredJarFiles.size()]);
-				URL[] urls = new URL[keys.length];
+				URL[] urls = new URL[keys.length + requiredClasspaths.size()];
 
 				int count = 0;
 				try {
@@ -124,7 +129,13 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 					ExceptionUtils.tryRethrowIOException(t);
 					throw new IOException("Library cache could not register the user code libraries.", t);
 				}
-				
+
+				// add classpaths
+				for (URL url : requiredClasspaths) {
+					urls[count] = url;
+					count++;
+				}
+
 				URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls);
 				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 66bda45..1ef6e31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Collection;
 
 public class FallbackLibraryCacheManager implements LibraryCacheManager {
@@ -43,12 +44,13 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
 	}
 
 	@Override
-	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) {
+	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
 	
 	@Override
-	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) {
+	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
+			Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 52a8048..03c8f27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Collection;
 
 public interface LibraryCacheManager {
@@ -45,22 +46,26 @@ public interface LibraryCacheManager {
 	File getFile(BlobKey blobKey) throws IOException;
 
 	/**
-	 * Registers a job with its required jar files. The jar files are identified by their blob keys.
+	 * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
+	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
 	 * @throws IOException
 	 */
-	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException;
+	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
+			throws IOException;
 	
 	/**
-	 * Registers a job task execution with its required jar files. The jar files are identified by their blob keys.
+	 * Registers a job task execution with its required jar files and classpaths. The jar files are identified by their blob keys.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
+	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
 	 * @throws IOException
 	 */
-	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) throws IOException;
+	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
+			Collection<URL> requiredClasspaths) throws IOException;
 
 	/**
 	 * Unregisters a job from the library cache manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 889ce43..ef00484 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -142,6 +143,10 @@ public class ExecutionGraph implements Serializable {
 	 * inside the BlobService and are referenced via the BLOB keys. */
 	private final List<BlobKey> requiredJarFiles;
 
+	/** A list of all classpaths required during the job execution. Classpaths have to be
+	 * accessible on all nodes in the cluster. */
+	private final List<URL> requiredClasspaths;
+
 	/** Listeners that receive messages when the entire job switches it status (such as from
 	 * RUNNING to FINISHED) */
 	private final List<ActorGateway> jobStatusListenerActors;
@@ -239,6 +244,7 @@ public class ExecutionGraph implements Serializable {
 			jobConfig,
 			timeout,
 			new ArrayList<BlobKey>(),
+			new ArrayList<URL>(),
 			ExecutionGraph.class.getClassLoader()
 		);
 	}
@@ -250,6 +256,7 @@ public class ExecutionGraph implements Serializable {
 			Configuration jobConfig,
 			FiniteDuration timeout,
 			List<BlobKey> requiredJarFiles,
+			List<URL> requiredClasspaths,
 			ClassLoader userClassLoader) {
 
 		if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
@@ -275,6 +282,7 @@ public class ExecutionGraph implements Serializable {
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
 
 		this.requiredJarFiles = requiredJarFiles;
+		this.requiredClasspaths = requiredClasspaths;
 
 		this.timeout = timeout;
 	}
@@ -427,8 +435,15 @@ public class ExecutionGraph implements Serializable {
 		return this.requiredJarFiles;
 	}
 
-	// --------------------------------------------------------------------------------------------
+	/**
+	 * Returns a list of classpaths referring to the directories/JAR files required to run this job
+	 * @return list of classpaths referring to the directories/JAR files required to run this job
+	 */
+	public List<URL> getRequiredClasspaths() {
+		return this.requiredClasspaths;
+	}
 
+	// --------------------------------------------------------------------------------------------
 
 	public void setJsonPlan(String jsonPlan) {
 		this.jsonPlan = jsonPlan;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0d039cc..6a63528 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -49,6 +49,7 @@ import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -644,11 +645,13 @@ public class ExecutionVertex implements Serializable {
 		}
 
 		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
+		List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();
 
 		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
-				producedPartitions, consumedPartitions, jarFiles, targetSlot.getRoot().getSlotNumber(), operatorState);
+				producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
+				operatorState);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 4014a76..6b36e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -95,6 +96,9 @@ public class JobGraph implements Serializable {
 	/** The settings for asynchronous snapshots */
 	private JobSnapshottingSettings snapshotSettings;
 
+	/** List of classpaths required to run this job. */
+	private List<URL> classpaths = Collections.<URL>emptyList();
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -348,7 +352,20 @@ public class JobGraph implements Serializable {
 	public JobVertex findVertexByID(JobVertexID id) {
 		return this.taskVertices.get(id);
 	}
-	
+
+	/**
+	 * Sets the classpaths required to run the job on a task manager.
+	 * 
+	 * @param paths paths of the directories/JAR files required to run the job on a task manager
+	 */
+	public void setClasspaths(List<URL> paths) {
+		classpaths = paths;
+	}
+
+	public List<URL> getClasspaths() {
+		return classpaths;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 434c5d1..269222f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -140,6 +141,9 @@ public class Task implements Runnable {
 	/** The jar files used by this task */
 	private final List<BlobKey> requiredJarFiles;
 
+	/** The classpaths used by this task */
+	private final List<URL> requiredClasspaths;
+
 	/** The name of the class that holds the invokable code */
 	private final String nameOfInvokableClass;
 
@@ -245,6 +249,7 @@ public class Task implements Runnable {
 		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
 		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
 		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
+		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
 		this.operatorState = tdd.getOperatorState();
 
@@ -702,7 +707,7 @@ public class Task implements Runnable {
 		long startDownloadTime = System.currentTimeMillis();
 
 		// triggers the download of all missing jar files from the job manager
-		libraryCache.registerTask(jobId, executionId, requiredJarFiles);
+		libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
 
 		LOG.debug("Register task {} at library cache manager took {} milliseconds",
 				executionId, System.currentTimeMillis() - startDownloadTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bbb382e..7bb7783 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -618,7 +618,8 @@ class JobManager(
         // because this makes sure that the uploaded jar files are removed in case of
         // unsuccessful
         try {
-          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
+          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
+            jobGraph.getClasspaths)
         }
         catch {
           case t: Throwable =>
@@ -657,6 +658,7 @@ class JobManager(
               jobGraph.getJobConfiguration,
               timeout,
               jobGraph.getUserJarBlobKeys,
+              jobGraph.getClasspaths,
               userCodeLoader)
             val jobInfo = JobInfo(
               client,

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 58755f3..2dd1caf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -51,10 +52,11 @@ public class TaskDeploymentDescriptorTest {
 			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
 			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
+			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
 	
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
 				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass.getName(), producedResults, inputGates, requiredJars, 47);
+				invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47);
 	
 			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
 	
@@ -73,6 +75,7 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getInputGates(), copy.getInputGates());
 
 			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
+			assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 4e814f5..5a5ef57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -36,6 +36,7 @@ import static org.junit.Assume.assumeTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -68,7 +69,7 @@ public class BlobLibraryCacheManagerTest {
 
 			long cleanupInterval = 1000l;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
-			libraryCacheManager.registerJob(jid, keys);
+			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
 
 			List<File> files = new ArrayList<File>();
 
@@ -170,7 +171,7 @@ public class BlobLibraryCacheManagerTest {
 				ExecutionAttemptID executionId = new ExecutionAttemptID();
 				Collection<BlobKey> keys = Collections.singleton(dataKey1);
 
-				libCache.registerTask(jid, executionId, keys);
+				libCache.registerTask(jid, executionId, keys, Collections.<URL>emptyList());
 				assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
 				assertEquals(1, libCache.getNumberOfCachedLibraries());
 				assertNotNull(libCache.getClassLoader(jid));
@@ -201,7 +202,8 @@ public class BlobLibraryCacheManagerTest {
 
 			// since we cannot download this library any more, this call should fail
 			try {
-				libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2));
+				libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2),
+						Collections.<URL>emptyList());
 				fail("This should fail with an IOException");
 			}
 			catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index c0fe750..fff6e70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -47,6 +47,7 @@ import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
+import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -153,6 +154,7 @@ public class TaskAsyncCallTest {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
 				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
 				0);
 
 		ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;