You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/08 15:58:33 UTC
[1/2] flink git commit: [FLINK-1789][core][runtime] Allow adding of
URLs to the usercode class loader
Repository: flink
Updated Branches:
refs/heads/master fa2bb8f11 -> 0ee0c1f55
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 461138c..131a2e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -74,6 +74,7 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -162,7 +163,7 @@ public class TaskManagerTest {
new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
new Within(d) {
@@ -267,13 +268,13 @@ public class TaskManagerTest {
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final ActorGateway tm = taskManager;
@@ -405,13 +406,13 @@ public class TaskManagerTest {
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
new Within(d){
@@ -507,13 +508,14 @@ public class TaskManagerTest {
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+ irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
+ Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(ircdd),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
new Within(d) {
@@ -651,13 +653,13 @@ public class TaskManagerTest {
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(ircdd),
- new ArrayList<BlobKey>(), 0);
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
new Within(d){
@@ -796,7 +798,8 @@ public class TaskManagerTest {
Tasks.AgnosticReceiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(igdd),
- Collections.<BlobKey>emptyList(), 0);
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(), 0);
new Within(d) {
@Override
@@ -888,7 +891,8 @@ public class TaskManagerTest {
Tasks.AgnosticReceiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(igdd),
- Collections.<BlobKey>emptyList(), 0);
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(), 0);
new Within(new FiniteDuration(120, TimeUnit.SECONDS)) {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index cf1bfc5..fb933f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.lang.reflect.Field;
+import java.net.URL;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -739,6 +740,7 @@ public class TaskTest {
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
0);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index cb37fd4..a336957 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,7 +19,6 @@ package org.apache.flink.api.java;
* limitations under the License.
*/
-import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
@@ -27,8 +26,9 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
+import java.io.File;
+import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/**
@@ -50,7 +50,7 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
* @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
*/
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
- super(host, port, jarFiles);
+ super(host, port, null, jarFiles, null);
this.flinkILoop = flinkILoop;
}
@@ -65,21 +65,22 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
- String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
+ URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
// get "external jars, and add the shell command jar, pass to executor
- List<String> alljars = new ArrayList<String>();
+ List<URL> alljars = new ArrayList<>();
// get external (library) jars
String[] extJars = this.flinkILoop.getExternalJars();
-
- if(!ArrayUtils.isEmpty(extJars)) {
- alljars.addAll(Arrays.asList(extJars));
+
+ for (String extJar : extJars) {
+ URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL();
+ alljars.add(extJarUrl);
}
+
// add shell commands
- alljars.add(jarFile);
- String[] alljarsArr = new String[alljars.size()];
- alljarsArr = alljars.toArray(alljarsArr);
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);
+ alljars.add(jarUrl);
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
+ alljars.toArray(new URL[alljars.size()]), null);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ccf51d2..02c938e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -19,7 +19,12 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
@@ -40,13 +45,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
+ /** The hostname of the JobManager */
private final String host;
+
+ /** The port of the JobManager main actor system */
private final int port;
- private final List<File> jarFiles;
-
+
/** The configuration used to parametrize the client that connects to the remote cluster */
private final Configuration config;
+ /** The jar files that need to be attached to each job */
+ private final List<URL> jarFiles;
+
+ /** The classpaths that need to be attached to each job */
+ private final List<URL> globalClasspaths;
+
/**
* Creates a new RemoteStreamEnvironment that points to the master
* (JobManager) described by the given host name and port.
@@ -87,6 +100,34 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
* provided in the JAR files.
*/
public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
+ this(host, port, config, jarFiles, null);
+ }
+
+ /**
+ * Creates a new RemoteStreamEnvironment that points to the master
+ * (JobManager) described by the given host name and port.
+ *
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param config
+ * The configuration used to parametrize the client that connects to the
+ * remote cluster.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @param globalClasspaths
+ * The paths of directories and JAR files that are added to each user code
+ * classloader on all nodes in the cluster. Note that the paths must specify a
+ * protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
+ * The protocol must be supported by the {@link java.net.URLClassLoader}.
+ */
+ public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be used when submitting a program through a client, " +
@@ -103,16 +144,23 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
this.host = host;
this.port = port;
this.config = config == null ? new Configuration() : config;
- this.jarFiles = new ArrayList<File>(jarFiles.length);
+ this.jarFiles = new ArrayList<>(jarFiles.length);
for (String jarFile : jarFiles) {
- File file = new File(jarFile);
try {
- JobWithJars.checkJarFile(file);
+ URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
+ this.jarFiles.add(jarFileUrl);
+ JobWithJars.checkJarFile(jarFileUrl);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Problem with jar file " + jarFile, e);
}
- catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
- }
- this.jarFiles.add(file);
+ }
+ if (globalClasspaths == null) {
+ this.globalClasspaths = Collections.emptyList();
+ }
+ else {
+ this.globalClasspaths = Arrays.asList(globalClasspaths);
}
}
@@ -135,11 +183,16 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
LOG.info("Running remotely at {}:{}", host, port);
}
- for (File file : jarFiles) {
- jobGraph.addJar(new Path(file.getAbsolutePath()));
+ for (URL jarFile : jarFiles) {
+ try {
+ jobGraph.addJar(new Path(jarFile.toURI()));
+ } catch (URISyntaxException e) {
+ throw new ProgramInvocationException("URL is invalid", e);
+ }
}
- ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+ ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
+ getClass().getClassLoader());
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 2b2a426..1392efb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.environment;
-import java.io.File;
+import java.net.URL;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
@@ -36,7 +36,9 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
- private final List<File> jars;
+ private final List<URL> jars;
+
+ private final List<URL> classpaths;
private final Client client;
@@ -44,12 +46,15 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private final boolean wait;
- protected StreamContextEnvironment(Client client, List<File> jars, int parallelism, boolean wait) {
+ protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
+ boolean wait) {
this.client = client;
this.jars = jars;
+ this.classpaths = classpaths;
this.wait = wait;
- this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+ this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
+ getClass().getClassLoader());
if (parallelism > 0) {
setParallelism(parallelism);
@@ -84,10 +89,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
transformations.clear();
// attach all necessary jar files to the JobGraph
- for (File file : jars) {
- jobGraph.addJar(new Path(file.getAbsolutePath()));
+ for (URL file : jars) {
+ jobGraph.addJar(new Path(file.toURI()));
}
+ jobGraph.setClasspaths(classpaths);
+
// execute the programs
if (wait) {
return client.runBlocking(jobGraph, userCodeClassLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index cc96217..28410fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -69,9 +69,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.SplittableIterator;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1285,7 +1285,7 @@ public abstract class StreamExecutionEnvironment {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
ContextEnvironment ctx = (ContextEnvironment) env;
- return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+ return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
ctx.getParallelism(), ctx.isWait());
} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
@@ -1295,9 +1295,9 @@ public abstract class StreamExecutionEnvironment {
}
private static StreamExecutionEnvironment createContextEnvironment(
- Client client, List<File> jars, int parallelism, boolean wait)
+ Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean wait)
{
- return new StreamContextEnvironment(client, jars, parallelism, wait);
+ return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 4480d95..dab6a6d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -68,12 +67,23 @@ public class ClassLoaderITCase {
PackagedProgram inputSplitTestProg = new PackagedProgram(
new File(INPUT_SPLITS_PROG_JAR_FILE),
new String[] { INPUT_SPLITS_PROG_JAR_FILE,
+ "", // classpath
"localhost",
String.valueOf(port),
"4" // parallelism
});
inputSplitTestProg.invokeInteractiveModeForExecution();
+ String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
+ PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
+ new String[] { "",
+ classpath, // classpath
+ "localhost",
+ String.valueOf(port),
+ "4" // parallelism
+ } );
+ inputSplitTestProg2.invokeInteractiveModeForExecution();
+
// regular streaming job
PackagedProgram streamingProg = new PackagedProgram(
new File(STREAMING_PROG_JAR_FILE),
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index 36b56e0..b18e8e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.classloading.jar;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,13 +42,13 @@ import org.apache.flink.core.io.InputSplitAssigner;
public class CustomInputSplitProgram {
public static void main(String[] args) throws Exception {
-
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
- final int parallelism = Integer.parseInt(args[3]);
-
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ final String[] jarFile = (args[0].equals(""))? null : new String[] { args[0] };
+ final URL[] classpath = (args[1].equals(""))? null : new URL[] { new URL(args[1]) };
+ final String host = args[2];
+ final int port = Integer.parseInt(args[3]);
+ final int parallelism = Integer.parseInt(args[4]);
+
+ RemoteEnvironment env = new RemoteEnvironment(host, port, null, jarFile, classpath);
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();
@@ -59,7 +61,7 @@ public class CustomInputSplitProgram {
return new Tuple2<Integer, Double>(value, value * 0.5);
}
})
- .output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Double>>());
env.execute();
}
[2/2] flink git commit: [FLINK-1789][core][runtime] Allow adding of
URLs to the usercode class loader
Posted by mx...@apache.org.
[FLINK-1789][core][runtime] Allow adding of URLs to the usercode class loader
This closes #593.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ee0c1f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ee0c1f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ee0c1f5
Branch: refs/heads/master
Commit: 0ee0c1f5573ad059dc6a8e4489094b7f78267306
Parents: fa2bb8f
Author: twalthr <tw...@apache.org>
Authored: Thu Apr 9 01:33:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Oct 8 15:55:09 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 8 +-
.../org/apache/flink/client/RemoteExecutor.java | 66 ++++++----
.../flink/client/cli/CliFrontendParser.java | 11 ++
.../apache/flink/client/cli/ProgramOptions.java | 23 ++++
.../org/apache/flink/client/program/Client.java | 43 +++---
.../client/program/ContextEnvironment.java | 43 ++++--
.../flink/client/program/JobWithJars.java | 91 +++++++------
.../flink/client/program/PackagedProgram.java | 130 ++++++++++++++-----
.../client/CliFrontendPackageProgramTest.java | 52 ++++++--
.../RemoteExecutorHostnameResolutionTest.java | 4 +-
.../apache/flink/client/program/ClientTest.java | 3 +-
.../org/apache/flink/storm/api/FlinkClient.java | 17 ++-
.../apache/flink/storm/api/FlinkSubmitter.java | 9 +-
.../apache/flink/api/common/PlanExecutor.java | 20 ++-
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../flink/api/java/RemoteEnvironment.java | 61 +++++++--
.../deployment/TaskDeploymentDescriptor.java | 20 ++-
.../librarycache/BlobLibraryCacheManager.java | 27 ++--
.../FallbackLibraryCacheManager.java | 6 +-
.../librarycache/LibraryCacheManager.java | 13 +-
.../runtime/executiongraph/ExecutionGraph.java | 17 ++-
.../runtime/executiongraph/ExecutionVertex.java | 5 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 19 ++-
.../apache/flink/runtime/taskmanager/Task.java | 7 +-
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../TaskDeploymentDescriptorTest.java | 5 +-
.../BlobLibraryCacheManagerTest.java | 8 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 2 +
.../runtime/taskmanager/TaskManagerTest.java | 26 ++--
.../flink/runtime/taskmanager/TaskTest.java | 2 +
.../api/java/ScalaShellRemoteEnvironment.java | 25 ++--
.../environment/RemoteStreamEnvironment.java | 77 +++++++++--
.../environment/StreamContextEnvironment.java | 19 ++-
.../environment/StreamExecutionEnvironment.java | 8 +-
.../test/classloading/ClassLoaderITCase.java | 12 +-
.../jar/CustomInputSplitProgram.java | 18 +--
36 files changed, 652 insertions(+), 251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 527c0df..c638894 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -89,7 +90,7 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
- * Implementation of a simple command line fronted for executing programs.
+ * Implementation of a simple command line frontend for executing programs.
*/
public class CliFrontend {
@@ -702,6 +703,7 @@ public class CliFrontend {
{
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
+ List<URL> classpaths = options.getClasspaths();
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
@@ -721,8 +723,8 @@ public class CliFrontend {
String entryPointClass = options.getEntryPointClassName();
return entryPointClass == null ?
- new PackagedProgram(jarFile, programArgs) :
- new PackagedProgram(jarFile, entryPointClass, programArgs);
+ new PackagedProgram(jarFile, classpaths, programArgs) :
+ new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index e8e9ade..bc0d220 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
@@ -49,53 +50,66 @@ import org.apache.flink.configuration.Configuration;
* remotely execute program parts.</p>
*/
public class RemoteExecutor extends PlanExecutor {
-
+
private final Object lock = new Object();
-
- private final List<String> jarFiles;
+
+ private final List<URL> jarFiles;
+
+ private final List<URL> globalClasspaths;
private final Configuration clientConfiguration;
private Client client;
-
+
private int defaultParallelism = 1;
-
-
+
+
public RemoteExecutor(String hostname, int port) {
- this(hostname, port, Collections.<String>emptyList(), new Configuration());
+ this(hostname, port, new Configuration(), Collections.<URL>emptyList(),
+ Collections.<URL>emptyList());
}
-
- public RemoteExecutor(String hostname, int port, String jarFile) {
- this(hostname, port, Collections.singletonList(jarFile), new Configuration());
+
+ public RemoteExecutor(String hostname, int port, URL jarFile) {
+ this(hostname, port, new Configuration(), Collections.singletonList(jarFile),
+ Collections.<URL>emptyList());
}
-
- public RemoteExecutor(String hostport, String jarFile) {
- this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
+
+ public RemoteExecutor(String hostport, URL jarFile) {
+ this(getInetFromHostport(hostport), new Configuration(), Collections.singletonList(jarFile),
+ Collections.<URL>emptyList());
}
-
- public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
- this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
+
+ public RemoteExecutor(String hostname, int port, List<URL> jarFiles) {
+ this(new InetSocketAddress(hostname, port), new Configuration(), jarFiles,
+ Collections.<URL>emptyList());
}
public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
- this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
+ this(hostname, port, clientConfiguration, Collections.<URL>emptyList(),
+ Collections.<URL>emptyList());
}
- public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
- this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
+ public RemoteExecutor(String hostname, int port, Configuration clientConfiguration, URL jarFile) {
+ this(hostname, port, clientConfiguration, Collections.singletonList(jarFile),
+ Collections.<URL>emptyList());
}
- public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
- this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
+ public RemoteExecutor(String hostport, Configuration clientConfiguration, URL jarFile) {
+ this(getInetFromHostport(hostport), clientConfiguration,
+ Collections.singletonList(jarFile), Collections.<URL>emptyList());
}
- public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
- this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
+ public RemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+ List<URL> jarFiles, List<URL> globalClasspaths) {
+ this(new InetSocketAddress(hostname, port), clientConfiguration, jarFiles, globalClasspaths);
}
- public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
- this.jarFiles = jarFiles;
+ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
+ List<URL> jarFiles, List<URL> globalClasspaths) {
this.clientConfiguration = clientConfiguration;
+ this.jarFiles = jarFiles;
+ this.globalClasspaths = globalClasspaths;
+
clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
@@ -171,7 +185,7 @@ public class RemoteExecutor extends PlanExecutor {
throw new IllegalArgumentException("The plan may not be null.");
}
- JobWithJars p = new JobWithJars(plan, this.jarFiles);
+ JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
return executePlanWithJars(p);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 52f1260..028aead 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -47,6 +47,12 @@ public class CliFrontendParser {
"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
"JAR file does not specify the class in its manifest.");
+ static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
+ "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
+ "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " +
+ "times for specifying more than one URL. The protocol must be supported by the " +
+ "{@link java.net.URLClassLoader}.");
+
static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
@@ -78,6 +84,9 @@ public class CliFrontendParser {
CLASS_OPTION.setRequired(false);
CLASS_OPTION.setArgName("classname");
+ CLASSPATH_OPTION.setRequired(false);
+ CLASSPATH_OPTION.setArgName("url");
+
ADDRESS_OPTION.setRequired(false);
ADDRESS_OPTION.setArgName("host:port");
@@ -110,6 +119,7 @@ public class CliFrontendParser {
public static Options getProgramSpecificOptions(Options options) {
options.addOption(JAR_OPTION);
options.addOption(CLASS_OPTION);
+ options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
@@ -121,6 +131,7 @@ public class CliFrontendParser {
private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
+ options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
return options;
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index c45da13..11382d2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -19,11 +19,16 @@ package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.ArrayList;
import java.util.Arrays;
import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
@@ -36,6 +41,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
private final String entryPointClass;
+ private final List<URL> classpaths;
+
private final String[] programArgs;
private final int parallelism;
@@ -62,6 +69,18 @@ public abstract class ProgramOptions extends CommandLineOptions {
this.programArgs = args;
+ List<URL> classpaths = new ArrayList<URL>();
+ if (line.hasOption(CLASSPATH_OPTION.getOpt())) {
+ for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {
+ try {
+ classpaths.add(new URL(path));
+ } catch (MalformedURLException e) {
+ throw new CliArgsException("Bad syntax for classpath: " + path);
+ }
+ }
+ }
+ this.classpaths = classpaths;
+
this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
line.getOptionValue(CLASS_OPTION.getOpt()) : null;
@@ -96,6 +115,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
return entryPointClass;
}
+ public List<URL> getClasspaths() {
+ return classpaths;
+ }
+
public String[] getProgramArgs() {
return programArgs;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 91ed665..dfb9c1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,8 +18,9 @@
package org.apache.flink.client.program;
-import java.io.File;
import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -282,7 +283,8 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
+ prog.getUserCodeClassLoader(), parallelism, true);
// invoke here
try {
@@ -308,7 +310,8 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
+ prog.getUserCodeClassLoader(), parallelism, false);
// invoke here
try {
@@ -348,7 +351,7 @@ public class Client {
}
OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
- return runBlocking(optPlan, program.getJarFiles(), classLoader);
+ return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
}
/**
@@ -373,21 +376,21 @@ public class Client {
}
OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
- return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
+ return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
}
- public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
- throws ProgramInvocationException
+ public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
+ ClassLoader classLoader) throws ProgramInvocationException
{
- JobGraph job = getJobGraph(compiledPlan, libraries);
+ JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
return runBlocking(job, classLoader);
}
- public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
- throws ProgramInvocationException
+ public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
+ ClassLoader classLoader) throws ProgramInvocationException
{
- JobGraph job = getJobGraph(compiledPlan, libraries);
+ JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
return runDetached(job, classLoader);
}
@@ -538,16 +541,16 @@ public class Client {
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
*/
- private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
- ProgramInvocationException {
+ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
+ throws CompilerException, ProgramInvocationException {
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
- return getJobGraph(optPlan, prog.getAllLibraries());
+ return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths());
}
- private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+ private static JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths) {
JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
@@ -556,9 +559,15 @@ public class Client {
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
- for (File jar : jarFiles) {
- job.addJar(new Path(jar.getAbsolutePath()));
+ for (URL jar : jarFiles) {
+ try {
+ job.addJar(new Path(jar.toURI()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("URL is invalid. This should not happen.", e);
+ }
}
+
+ job.setClasspaths(classpaths);
return job;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index e33a05d..d5a28fc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -18,7 +18,7 @@
package org.apache.flink.client.program;
-import java.io.File;
+import java.net.URL;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
@@ -41,16 +41,21 @@ public class ContextEnvironment extends ExecutionEnvironment {
private final Client client;
- private final List<File> jarFilesToAttach;
+ private final List<URL> jarFilesToAttach;
+ private final List<URL> classpathsToAttach;
+
private final ClassLoader userCodeClassLoader;
private final boolean wait;
-
-
- public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
+
+
+
+ public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
+ ClassLoader userCodeClassLoader, boolean wait) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
+ this.classpathsToAttach = classpaths;
this.userCodeClassLoader = userCodeClassLoader;
this.wait = wait;
}
@@ -58,7 +63,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
- JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
+ JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
+ this.userCodeClassLoader);
if (wait) {
this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
@@ -101,17 +107,21 @@ public class ContextEnvironment extends ExecutionEnvironment {
return this.client;
}
- public List<File> getJars(){
+ public List<URL> getJars(){
return jarFilesToAttach;
}
+
+ public List<URL> getClasspaths(){
+ return classpathsToAttach;
+ }
// --------------------------------------------------------------------------------------------
- static void setAsContext(Client client, List<File> jarFilesToAttach,
+ static void setAsContext(Client client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach,
ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
- ContextEnvironmentFactory factory =
- new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+ ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach,
+ classpathsToAttach, userCodeClassLoader, defaultParallelism, wait);
initializeContextEnvironment(factory);
}
@@ -130,7 +140,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
private final Client client;
- private final List<File> jarFilesToAttach;
+ private final List<URL> jarFilesToAttach;
+
+ private final List<URL> classpathsToAttach;
private final ClassLoader userCodeClassLoader;
@@ -139,11 +151,13 @@ public class ContextEnvironment extends ExecutionEnvironment {
private final boolean wait;
- public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach,
- ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
+ public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
+ List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
+ boolean wait)
{
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
+ this.classpathsToAttach = classpathsToAttach;
this.userCodeClassLoader = userCodeClassLoader;
this.defaultParallelism = defaultParallelism;
this.wait = wait;
@@ -151,7 +165,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader, wait);
+ ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach,
+ userCodeClassLoader, wait);
if (defaultParallelism > 0) {
env.setParallelism(defaultParallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index 9e84e2d..ef02527 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -20,7 +20,7 @@ package org.apache.flink.client.program;
import java.io.File;
import java.io.IOException;
-import java.net.MalformedURLException;
+import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
@@ -37,33 +37,42 @@ public class JobWithJars {
private Plan plan;
- private List<File> jarFiles;
-
+ private List<URL> jarFiles;
+
+ /**
+ * classpaths that are needed during user code execution
+ */
+ private List<URL> classpaths;
+
private ClassLoader userCodeClassLoader;
-
- public JobWithJars(Plan plan, List<String> jarFiles) throws IOException {
+ public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths) throws IOException {
this.plan = plan;
- this.jarFiles = new ArrayList<File>(jarFiles.size());
-
- for (String jar: jarFiles) {
- File file = new File(jar);
- checkJarFile(file);
- this.jarFiles.add(file);
+ this.jarFiles = new ArrayList<URL>(jarFiles.size());
+ this.classpaths = new ArrayList<URL>(classpaths.size());
+
+ for (URL jarFile: jarFiles) {
+ checkJarFile(jarFile);
+ this.jarFiles.add(jarFile);
+ }
+
+ for (URL path: classpaths) {
+ this.classpaths.add(path);
}
}
-
- public JobWithJars(Plan plan, String jarFile) throws IOException {
+
+ public JobWithJars(Plan plan, URL jarFile) throws IOException {
this.plan = plan;
-
- File file = new File(jarFile);
- checkJarFile(file);
- this.jarFiles = Collections.singletonList(file);
+
+ checkJarFile(jarFile);
+ this.jarFiles = Collections.singletonList(jarFile);
+ this.classpaths = Collections.<URL>emptyList();
}
- JobWithJars(Plan plan, List<File> jarFiles, ClassLoader userCodeClassLoader) {
+ JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
this.plan = plan;
this.jarFiles = jarFiles;
+ this.classpaths = classpaths;
this.userCodeClassLoader = userCodeClassLoader;
}
@@ -77,48 +86,54 @@ public class JobWithJars {
/**
* Returns list of jar files that need to be submitted with the plan.
*/
- public List<File> getJarFiles() {
+ public List<URL> getJarFiles() {
return this.jarFiles;
}
/**
+ * Returns list of classpaths that need to be submitted with the plan.
+ */
+ public List<URL> getClasspaths() {
+ return classpaths;
+ }
+
+ /**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
*
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader() {
if (this.userCodeClassLoader == null) {
- this.userCodeClassLoader = buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+ this.userCodeClassLoader = buildUserCodeClassLoader(jarFiles, classpaths, getClass().getClassLoader());
}
-
return this.userCodeClassLoader;
}
- public static void checkJarFile(File jar) throws IOException {
- if (!jar.exists()) {
- throw new IOException("JAR file does not exist '" + jar.getAbsolutePath() + "'");
+ public static void checkJarFile(URL jar) throws IOException {
+ File jarFile;
+ try {
+ jarFile = new File(jar.toURI());
+ } catch (URISyntaxException e) {
+ throw new IOException("JAR file path is invalid '" + jar + "'");
}
- if (!jar.canRead()) {
- throw new IOException("JAR file can't be read '" + jar.getAbsolutePath() + "'");
+ if (!jarFile.exists()) {
+ throw new IOException("JAR file does not exist '" + jarFile.getAbsolutePath() + "'");
+ }
+ if (!jarFile.canRead()) {
+ throw new IOException("JAR file can't be read '" + jarFile.getAbsolutePath() + "'");
}
// TODO: Check if proper JAR file
}
- public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
-
- URL[] urls = new URL[jars.size()];
- try {
- // add the nested jars
- for (int i = 0; i < jars.size(); i++) {
- urls[i] = jars.get(i).getAbsoluteFile().toURI().toURL();
- }
+ public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent) {
+ URL[] urls = new URL[jars.size() + classpaths.size()];
+ for (int i = 0; i < jars.size(); i++) {
+ urls[i] = jars.get(i);
}
- catch (MalformedURLException e) {
- // this should not happen, as all files should have been checked before for proper paths and existence.
- throw new RuntimeException("Cannot create class loader for program jar files: " + e.getMessage(), e);
+ for (int i = 0; i < classpaths.size(); i++) {
+ urls[i + jars.size()] = classpaths.get(i);
}
-
return new URLClassLoader(urls, parent);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 0cd4d07..aeac296 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -30,6 +30,9 @@ import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
@@ -67,7 +70,7 @@ public class PackagedProgram {
// --------------------------------------------------------------------------------------------
- private final File jarFile;
+ private final URL jarFile;
private final String[] args;
@@ -76,6 +79,8 @@ public class PackagedProgram {
private final Class<?> mainClass;
private final List<File> extractedTempLibraries;
+
+ private final List<URL> classpaths;
private ClassLoader userCodeClassLoader;
@@ -84,7 +89,7 @@ public class PackagedProgram {
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
- *
+ *
* @param jarFile
* The jar file which contains the plan and a Manifest which defines
* the program-class
@@ -96,7 +101,27 @@ public class PackagedProgram {
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException {
- this(jarFile, null, args);
+ this(jarFile, Collections.<URL>emptyList(), null, args);
+ }
+
+ /**
+ * Creates an instance that wraps the plan defined in the jar file using the given
+ * argument.
+ *
+ * @param jarFile
+ * The jar file which contains the plan and a Manifest which defines
+ * the program-class
+ * @param classpaths
+ * Additional classpath URLs needed by the Program.
+ * @param args
+ * Optional. The arguments used to create the pact plan, depend on
+ * implementation of the pact plan. See getDescription().
+ * @throws ProgramInvocationException
+ * This invocation is thrown if the Program can't be properly loaded. Causes
+ * may be a missing / wrong class or manifest files.
+ */
+ public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException {
+ this(jarFile, classpaths, null, args);
}
/**
@@ -117,23 +142,54 @@ public class PackagedProgram {
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException {
+ this(jarFile, Collections.<URL>emptyList(), entryPointClassName, args);
+ }
+
+ /**
+ * Creates an instance that wraps the plan defined in the jar file using the given
+ * arguments. For generating the plan the class defined in the className parameter
+ * is used.
+ *
+ * @param jarFile
+ * The jar file which contains the plan.
+ * @param classpaths
+ * Additional classpath URLs needed by the Program.
+ * @param entryPointClassName
+ * Name of the class which generates the plan. Overrides the class defined
+ * in the jar file manifest
+ * @param args
+ * Optional. The arguments used to create the pact plan, depend on
+ * implementation of the pact plan. See getDescription().
+ * @throws ProgramInvocationException
+ * This invocation is thrown if the Program can't be properly loaded. Causes
+ * may be a missing / wrong class or manifest files.
+ */
+ public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException {
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
- checkJarFile(jarFile);
+ URL jarFileUrl;
+ try {
+ jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
+ } catch (MalformedURLException e1) {
+ throw new IllegalArgumentException("The jar file path is invalid.");
+ }
+
+ checkJarFile(jarFileUrl);
- this.jarFile = jarFile;
+ this.jarFile = jarFileUrl;
this.args = args == null ? new String[0] : args;
// if no entryPointClassName name was given, we try and look one up through the manifest
if (entryPointClassName == null) {
- entryPointClassName = getEntryPointClassNameFromJar(jarFile);
+ entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
}
// now that we have an entry point, we can extract the nested jar files (if any)
- this.extractedTempLibraries = extractContainedLibaries(jarFile);
- this.userCodeClassLoader = buildUserCodeClassLoader(jarFile, extractedTempLibraries, getClass().getClassLoader());
+ this.extractedTempLibraries = extractContainedLibaries(jarFileUrl);
+ this.classpaths = classpaths;
+ this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
// load the entry point class
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
@@ -168,6 +224,7 @@ public class PackagedProgram {
this.args = args == null ? new String[0] : args;
this.extractedTempLibraries = Collections.emptyList();
+ this.classpaths = Collections.emptyList();
this.userCodeClassLoader = entryPointClass.getClassLoader();
// load the entry point class
@@ -224,14 +281,7 @@ public class PackagedProgram {
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException {
if (isUsingProgramEntryPoint()) {
- List<File> allJars = new ArrayList<File>();
-
- if (this.jarFile != null) {
- allJars.add(jarFile);
- }
- allJars.addAll(extractedTempLibraries);
-
- return new JobWithJars(getPlan(), allJars, userCodeClassLoader);
+ return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
} else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
@@ -347,7 +397,16 @@ public class PackagedProgram {
throw new ProgramInvocationException("Cannot invoke a plan-based program directly.");
}
}
-
+
+ /**
+ * Returns the classpaths that are required by the program.
+ *
+ * @return List of {@link java.net.URL}s.
+ */
+ public List<URL> getClasspaths() {
+ return this.classpaths;
+ }
+
/**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
*
@@ -357,15 +416,24 @@ public class PackagedProgram {
return this.userCodeClassLoader;
}
- public List<File> getAllLibraries() {
- List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() + 1);
+ public List<URL> getAllLibraries() {
+ List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
+
if (jarFile != null) {
libs.add(jarFile);
}
- libs.addAll(this.extractedTempLibraries);
+ for (File tmpLib : this.extractedTempLibraries) {
+ try {
+ libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
+ }
+ catch (MalformedURLException e) {
+ throw new RuntimeException("URL is invalid. This should not happen.", e);
+ }
+ }
+
return libs;
}
-
+
/**
* Deletes all temporary files created for contained packaged libraries.
*/
@@ -449,14 +517,16 @@ public class PackagedProgram {
}
}
- private static String getEntryPointClassNameFromJar(File jarFile) throws ProgramInvocationException {
+ private static String getEntryPointClassNameFromJar(URL jarFile) throws ProgramInvocationException {
JarFile jar;
Manifest manifest;
String className;
// Open jar file
try {
- jar = new JarFile(jarFile);
+ jar = new JarFile(new File(jarFile.toURI()));
+ } catch (URISyntaxException use) {
+ throw new ProgramInvocationException("Invalid file path '" + jarFile.getPath() + "'", use);
} catch (IOException ioex) {
throw new ProgramInvocationException("Error while opening jar file '" + jarFile.getPath() + "'. "
+ ioex.getMessage(), ioex);
@@ -553,13 +623,13 @@ public class PackagedProgram {
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
- private static List<File> extractContainedLibaries(File jarFile) throws ProgramInvocationException {
+ private static List<File> extractContainedLibaries(URL jarFile) throws ProgramInvocationException {
Random rnd = new Random();
JarFile jar = null;
try {
- jar = new JarFile(jarFile);
+ jar = new JarFile(new File(jarFile.toURI()));
final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>();
Enumeration<JarEntry> entries = jar.entries();
@@ -658,15 +728,7 @@ public class PackagedProgram {
}
}
- private static ClassLoader buildUserCodeClassLoader(File mainJar, List<File> nestedJars, ClassLoader parent) throws ProgramInvocationException {
- ArrayList<File> allJars = new ArrayList<File>(nestedJars.size() + 1);
- allJars.add(mainJar);
- allJars.addAll(nestedJars);
-
- return JobWithJars.buildUserCodeClassLoader(allJars, parent);
- }
-
- private static void checkJarFile(File jarfile) throws ProgramInvocationException {
+ private static void checkJarFile(URL jarfile) throws ProgramInvocationException {
try {
JobWithJars.checkJarFile(jarfile);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 1718ba5..5439742 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.Client;
@@ -46,6 +45,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.FileNotFoundException;
+import java.net.URL;
public class CliFrontendPackageProgramTest {
@@ -102,11 +102,17 @@ public class CliFrontendPackageProgramTest {
@Test
public void testVariantWithExplicitJarAndArgumentsOption() {
try {
- String[] arguments = {"-j", getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"};
+ String[] arguments = {
+ "--classpath", "file:///tmp/foo",
+ "--classpath", "file:///tmp/bar",
+ "-j", getTestJarPath(),
+ "-a", "--debug", "true", "arg1", "arg2" };
+ URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
RunOptions options = CliFrontendParser.parseRunCommand(arguments);
assertEquals(getTestJarPath(), options.getJarFilePath());
+ assertArrayEquals(classpath, options.getClasspaths().toArray());
assertArrayEquals(reducedArguments, options.getProgramArgs());
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -124,11 +130,17 @@ public class CliFrontendPackageProgramTest {
@Test
public void testVariantWithExplicitJarAndNoArgumentsOption() {
try {
- String[] arguments = {"-j", getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+ String[] arguments = {
+ "--classpath", "file:///tmp/foo",
+ "--classpath", "file:///tmp/bar",
+ "-j", getTestJarPath(),
+ "--debug", "true", "arg1", "arg2" };
+ URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
RunOptions options = CliFrontendParser.parseRunCommand(arguments);
assertEquals(getTestJarPath(), options.getJarFilePath());
+ assertArrayEquals(classpath, options.getClasspaths().toArray());
assertArrayEquals(reducedArguments, options.getProgramArgs());
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -147,11 +159,17 @@ public class CliFrontendPackageProgramTest {
@Test
public void testValidVariantWithNoJarAndNoArgumentsOption() {
try {
- String[] arguments = {getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+ String[] arguments = {
+ "--classpath", "file:///tmp/foo",
+ "--classpath", "file:///tmp/bar",
+ getTestJarPath(),
+ "--debug", "true", "arg1", "arg2" };
+ URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
RunOptions options = CliFrontendParser.parseRunCommand(arguments);
assertEquals(getTestJarPath(), options.getJarFilePath());
+ assertArrayEquals(classpath, options.getClasspaths().toArray());
assertArrayEquals(reducedArguments, options.getProgramArgs());
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -182,11 +200,17 @@ public class CliFrontendPackageProgramTest {
@Test
public void testNonExistingFileWithArguments() {
try {
- String[] arguments = {"/some/none/existing/path", "--debug", "true", "arg1", "arg2"};
+ String[] arguments = {
+ "--classpath", "file:///tmp/foo",
+ "--classpath", "file:///tmp/bar",
+ "/some/none/existing/path",
+ "--debug", "true", "arg1", "arg2" };
+ URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
RunOptions options = CliFrontendParser.parseRunCommand(arguments);
- assertEquals(arguments[0], options.getJarFilePath());
+ assertEquals(arguments[4], options.getJarFilePath());
+ assertArrayEquals(classpath, options.getClasspaths().toArray());
assertArrayEquals(reducedArguments, options.getProgramArgs());
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -267,15 +291,19 @@ public class CliFrontendPackageProgramTest {
final boolean[] callme = { false }; // create a final object reference, to be able to change its val later
try {
- String[] arguments = {"-c", TEST_JAR_CLASSLOADERTEST_CLASS, getTestJarPath(),
+ String[] arguments = {
+ "--classpath", "file:///tmp/foo",
+ "--classpath", "file:///tmp/bar",
+ "-c", TEST_JAR_CLASSLOADERTEST_CLASS, getTestJarPath(),
"true", "arg1", "arg2" };
+ URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
+ String[] reducedArguments = { "true", "arg1", "arg2" };
- String[] progArgs = { "true", "arg1", "arg2" };
-
- InfoOptions options = CliFrontendParser.parseInfoCommand(arguments);
+ RunOptions options = CliFrontendParser.parseRunCommand(arguments);
assertEquals(getTestJarPath(), options.getJarFilePath());
+ assertArrayEquals(classpath, options.getClasspaths().toArray());
assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
- assertArrayEquals(progArgs, options.getProgramArgs());
+ assertArrayEquals(reducedArguments, options.getProgramArgs());
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
PackagedProgram prog = spy(frontend.buildProgram(options));
@@ -294,7 +322,7 @@ public class CliFrontendPackageProgramTest {
when(prog.getUserCodeClassLoader()).thenReturn(testClassLoader);
assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
- assertArrayEquals(progArgs, prog.getArguments());
+ assertArrayEquals(reducedArguments, prog.getArguments());
Configuration c = new Configuration();
Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index be3fe89..424f72e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.net.UnknownHostException;
import java.util.Collections;
@@ -68,7 +69,8 @@ public class RemoteExecutorHostnameResolutionTest {
try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
- RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
+ RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
+ Collections.<URL>emptyList(), Collections.<URL>emptyList());
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index a620139..1fbf681 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.net.URL;
import java.util.Collections;
import java.util.UUID;
@@ -82,7 +83,7 @@ public class ClientTest {
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
- JobWithJars jobWithJars = new JobWithJars(plan, Collections.<String>emptyList());
+ JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index b19cb38..2be7599 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -57,6 +57,9 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -167,11 +170,14 @@ public class FlinkClient {
throw new AlreadyAliveException();
}
- final File uploadedJarFile = new File(uploadedJarLocation);
+ final URI uploadedJarUri;
+ final URL uploadedJarUrl;
try {
- JobWithJars.checkJarFile(uploadedJarFile);
+ uploadedJarUri = new File(uploadedJarLocation).getAbsoluteFile().toURI();
+ uploadedJarUrl = uploadedJarUri.toURL();
+ JobWithJars.checkJarFile(uploadedJarUrl);
} catch (final IOException e) {
- throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+ throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
}
/* set storm configuration */
@@ -180,7 +186,7 @@ public class FlinkClient {
}
final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
- jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+ jobGraph.addJar(new Path(uploadedJarUri));
final Configuration configuration = jobGraph.getJobConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
@@ -195,7 +201,8 @@ public class FlinkClient {
try {
ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
- Lists.newArrayList(uploadedJarFile),
+ Lists.newArrayList(uploadedJarUrl),
+ Collections.<URL>emptyList(),
this.getClass().getClassLoader());
client.runDetached(jobGraph, classLoader);
} catch (final ProgramInvocationException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 9b03c68..13a39ef 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -23,6 +23,8 @@ import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.utils.Utils;
+import java.net.URISyntaxException;
+import java.net.URL;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.ConfigConstants;
@@ -99,6 +101,7 @@ public class FlinkSubmitter {
final String serConf = JSONValue.toJSONString(stormConf);
final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+
try {
if (client.getTopologyJobId(name) != null) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
@@ -106,11 +109,13 @@ public class FlinkSubmitter {
String localJar = System.getProperty("storm.jar");
if (localJar == null) {
try {
- for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+ for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
.getJars()) {
// TODO verify that there is only one jar
- localJar = file.getAbsolutePath();
+ localJar = new File(url.toURI()).getAbsolutePath();
}
+ } catch (final URISyntaxException e) {
+ // ignore
} catch (final ClassCastException e) {
// ignore
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 514692f..cbdf137 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common;
import org.apache.flink.configuration.Configuration;
+import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -169,9 +170,12 @@ public abstract class PlanExecutor {
* @param clientConfiguration The configuration for the client (Akka, default.parallelism).
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
+ * @param globalClasspaths A list of URLs that are added to the classpath of each user code classloader of the
+ * program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
* @return A remote executor.
*/
- public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
+ public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+ URL[] jarFiles, URL[] globalClasspaths) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
}
@@ -181,13 +185,17 @@ public abstract class PlanExecutor {
Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
- List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList()
- : Arrays.asList(jarFiles);
-
+ List<URL> files = (jarFiles == null || jarFiles.length == 0) ?
+ Collections.<URL>emptyList() : Arrays.asList(jarFiles);
+ List<URL> paths = (globalClasspaths == null || globalClasspaths.length == 0) ?
+ Collections.<URL>emptyList() : Arrays.asList(globalClasspaths);
+
try {
PlanExecutor executor = (clientConfiguration == null) ?
- reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
- reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
+ reClass.getConstructor(String.class, int.class, List.class)
+ .newInstance(hostname, port, files) :
+ reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
+ .newInstance(hostname, port, clientConfiguration, files, paths);
return executor;
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 01fb15c..aa7c0c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1152,7 +1152,7 @@ public abstract class ExecutionEnvironment {
*/
public static ExecutionEnvironment createRemoteEnvironment(
String host, int port, Configuration clientConfiguration, String... jarFiles) {
- return new RemoteEnvironment(host, port, clientConfiguration, jarFiles);
+ return new RemoteEnvironment(host, port, clientConfiguration, jarFiles, null);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 5a4aa1e..4b61426 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+
/**
* An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
* needs to be created with the address and port of the JobManager of the Flink cluster that
@@ -44,7 +48,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
protected final int port;
/** The jar files that need to be attached to each job */
- private final String[] jarFiles;
+ private final URL[] jarFiles;
/** The configuration used by the client that connects to the cluster */
private Configuration clientConfiguration;
@@ -55,20 +59,40 @@ public class RemoteEnvironment extends ExecutionEnvironment {
/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
private Thread shutdownHook;
+ /** The classpaths that need to be attached to each job */
+ private final URL[] globalClasspaths;
+
/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
* given host name and port.
- *
+ *
* <p>Each program execution will have all the given JAR files in its classpath.
- *
+ *
* @param host The host name or address of the master (JobManager), where the program should be executed.
- * @param port The port of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
- */
+ */
public RemoteEnvironment(String host, int port, String... jarFiles) {
- this(host, port, null, jarFiles);
+ this(host, port, null, jarFiles, null);
+ }
+
+ /**
+ * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
+ * given host name and port.
+ *
+ * <p>Each program execution will have all the given JAR files in its classpath.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param clientConfig The configuration used by the client that connects to the cluster.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+ * user-defined functions, user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ */
+ public RemoteEnvironment(String host, int port, Configuration clientConfig, String[] jarFiles) {
+ this(host, port, clientConfig, jarFiles, null);
}
/**
@@ -83,8 +107,13 @@ public class RemoteEnvironment extends ExecutionEnvironment {
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
+ * @param globalClasspaths The paths of directories and JAR files that are added to each user code
+ * classloader on all nodes in the cluster. Note that the paths must specify a
+ * protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
+ * The protocol must be supported by the {@link java.net.URLClassLoader}.
*/
- public RemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles) {
+ public RemoteEnvironment(String host, int port, Configuration clientConfig,
+ String[] jarFiles, URL[] globalClasspaths) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
@@ -99,8 +128,21 @@ public class RemoteEnvironment extends ExecutionEnvironment {
this.host = host;
this.port = port;
- this.jarFiles = jarFiles;
this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
+ if (jarFiles != null) {
+ this.jarFiles = new URL[jarFiles.length];
+ for (int i = 0; i < jarFiles.length; i++) {
+ try {
+ this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("JAR file path invalid", e);
+ }
+ }
+ }
+ else {
+ this.jarFiles = null;
+ }
+ this.globalClasspaths = globalClasspaths;
}
// ------------------------------------------------------------------------
@@ -146,7 +188,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
private void ensureExecutorCreated() throws Exception {
if (executor == null) {
- executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
+ executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
+ jarFiles, globalClasspaths);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index c4065d2..558fcd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;
import java.io.Serializable;
+import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -77,6 +78,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles;
+
+ /** The list of classpaths required to run this task. */
+ private final List<URL> requiredClasspaths;
private final SerializedValue<StateHandle<?>> operatorState;
@@ -89,13 +93,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<ResultPartitionDeploymentDescriptor> producedPartitions,
List<InputGateDeploymentDescriptor> inputGates,
- List<BlobKey> requiredJarFiles, int targetSlotNumber,
- SerializedValue<StateHandle<?>> operatorState) {
+ List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
+ int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState) {
checkArgument(indexInSubtaskGroup >= 0);
checkArgument(numberOfSubtasks > indexInSubtaskGroup);
checkArgument(targetSlotNumber >= 0);
-
+
this.jobID = checkNotNull(jobID);
this.vertexID = checkNotNull(vertexID);
this.executionId = checkNotNull(executionId);
@@ -108,6 +112,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
this.producedPartitions = checkNotNull(producedPartitions);
this.inputGates = checkNotNull(inputGates);
this.requiredJarFiles = checkNotNull(requiredJarFiles);
+ this.requiredClasspaths = checkNotNull(requiredClasspaths);
this.targetSlotNumber = targetSlotNumber;
this.operatorState = operatorState;
}
@@ -118,11 +123,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<ResultPartitionDeploymentDescriptor> producedPartitions,
List<InputGateDeploymentDescriptor> inputGates,
- List<BlobKey> requiredJarFiles, int targetSlotNumber) {
+ List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
+ int targetSlotNumber) {
this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
- inputGates, requiredJarFiles, targetSlotNumber, null);
+ inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null);
}
/**
@@ -208,6 +214,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
return requiredJarFiles;
}
+ public List<URL> getRequiredClasspaths() {
+ return requiredClasspaths;
+ }
+
@Override
public String toString() {
return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " +
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 88be5e1..97ec93a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -81,26 +81,31 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
// --------------------------------------------------------------------------------------------
@Override
- public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException {
- registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles);
+ public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
+ throws IOException {
+ registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, requiredClasspaths);
}
@Override
- public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles) throws IOException {
+ public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles,
+ Collection<URL> requiredClasspaths) throws IOException {
Preconditions.checkNotNull(jobId, "The JobId must not be null.");
Preconditions.checkNotNull(task, "The task execution id must not be null.");
-
+
if (requiredJarFiles == null) {
requiredJarFiles = Collections.emptySet();
}
-
+ if (requiredClasspaths == null) {
+ requiredClasspaths = Collections.emptySet();
+ }
+
synchronized (lockObject) {
LibraryCacheEntry entry = cacheEntries.get(jobId);
-
+
if (entry == null) {
// create a new entry in the library cache
BlobKey[] keys = requiredJarFiles.toArray(new BlobKey[requiredJarFiles.size()]);
- URL[] urls = new URL[keys.length];
+ URL[] urls = new URL[keys.length + requiredClasspaths.size()];
int count = 0;
try {
@@ -124,7 +129,13 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
ExceptionUtils.tryRethrowIOException(t);
throw new IOException("Library cache could not register the user code libraries.", t);
}
-
+
+ // add classpaths
+ for (URL url : requiredClasspaths) {
+ urls[count] = url;
+ count++;
+ }
+
URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls);
cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 66bda45..1ef6e31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Collection;
public class FallbackLibraryCacheManager implements LibraryCacheManager {
@@ -43,12 +44,13 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
}
@Override
- public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) {
+ public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
}
@Override
- public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) {
+ public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
+ Collection<URL> requiredClasspaths) {
LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 52a8048..03c8f27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Collection;
public interface LibraryCacheManager {
@@ -45,22 +46,26 @@ public interface LibraryCacheManager {
File getFile(BlobKey blobKey) throws IOException;
/**
- * Registers a job with its required jar files. The jar files are identified by their blob keys.
+ * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys.
*
* @param id job ID
* @param requiredJarFiles collection of blob keys identifying the required jar files
+ * @param requiredClasspaths collection of classpaths that are added to the user code class loader
* @throws IOException
*/
- void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException;
+ void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
+ throws IOException;
/**
- * Registers a job task execution with its required jar files. The jar files are identified by their blob keys.
+ * Registers a job task execution with its required jar files and classpaths. The jar files are identified by their blob keys.
*
* @param id job ID
* @param requiredJarFiles collection of blob keys identifying the required jar files
+ * @param requiredClasspaths collection of classpaths that are added to the user code class loader
* @throws IOException
*/
- void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) throws IOException;
+ void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
+ Collection<URL> requiredClasspaths) throws IOException;
/**
* Unregisters a job from the library cache manager.
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 889ce43..ef00484 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -142,6 +143,10 @@ public class ExecutionGraph implements Serializable {
* inside the BlobService and are referenced via the BLOB keys. */
private final List<BlobKey> requiredJarFiles;
+ /** A list of all classpaths required during the job execution. Classpaths have to be
+ * accessible on all nodes in the cluster. */
+ private final List<URL> requiredClasspaths;
+
/** Listeners that receive messages when the entire job switches it status (such as from
* RUNNING to FINISHED) */
private final List<ActorGateway> jobStatusListenerActors;
@@ -239,6 +244,7 @@ public class ExecutionGraph implements Serializable {
jobConfig,
timeout,
new ArrayList<BlobKey>(),
+ new ArrayList<URL>(),
ExecutionGraph.class.getClassLoader()
);
}
@@ -250,6 +256,7 @@ public class ExecutionGraph implements Serializable {
Configuration jobConfig,
FiniteDuration timeout,
List<BlobKey> requiredJarFiles,
+ List<URL> requiredClasspaths,
ClassLoader userClassLoader) {
if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
@@ -275,6 +282,7 @@ public class ExecutionGraph implements Serializable {
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
this.requiredJarFiles = requiredJarFiles;
+ this.requiredClasspaths = requiredClasspaths;
this.timeout = timeout;
}
@@ -427,8 +435,15 @@ public class ExecutionGraph implements Serializable {
return this.requiredJarFiles;
}
- // --------------------------------------------------------------------------------------------
+ /**
+ * Returns a list of classpaths referring to the directories/JAR files required to run this job
+ * @return list of classpaths referring to the directories/JAR files required to run this job
+ */
+ public List<URL> getRequiredClasspaths() {
+ return this.requiredClasspaths;
+ }
+ // --------------------------------------------------------------------------------------------
public void setJsonPlan(String jsonPlan) {
this.jsonPlan = jsonPlan;
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0d039cc..6a63528 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -49,6 +49,7 @@ import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -644,11 +645,13 @@ public class ExecutionVertex implements Serializable {
}
List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
+ List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();
return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
- producedPartitions, consumedPartitions, jarFiles, targetSlot.getRoot().getSlotNumber(), operatorState);
+ producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
+ operatorState);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 4014a76..6b36e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -95,6 +96,9 @@ public class JobGraph implements Serializable {
/** The settings for asynchronous snapshots */
private JobSnapshottingSettings snapshotSettings;
+ /** List of classpaths required to run this job. */
+ private List<URL> classpaths = Collections.<URL>emptyList();
+
// --------------------------------------------------------------------------------------------
/**
@@ -348,7 +352,20 @@ public class JobGraph implements Serializable {
public JobVertex findVertexByID(JobVertexID id) {
return this.taskVertices.get(id);
}
-
+
+ /**
+ * Sets the classpaths required to run the job on a task manager.
+ *
+ * @param paths paths of the directories/JAR files required to run the job on a task manager
+ */
+ public void setClasspaths(List<URL> paths) {
+ classpaths = paths;
+ }
+
+ public List<URL> getClasspaths() {
+ return classpaths;
+ }
+
// --------------------------------------------------------------------------------------------
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 434c5d1..269222f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
+import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -140,6 +141,9 @@ public class Task implements Runnable {
/** The jar files used by this task */
private final List<BlobKey> requiredJarFiles;
+ /** The classpaths used by this task */
+ private final List<URL> requiredClasspaths;
+
/** The name of the class that holds the invokable code */
private final String nameOfInvokableClass;
@@ -245,6 +249,7 @@ public class Task implements Runnable {
this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
+ this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
this.operatorState = tdd.getOperatorState();
@@ -702,7 +707,7 @@ public class Task implements Runnable {
long startDownloadTime = System.currentTimeMillis();
// triggers the download of all missing jar files from the job manager
- libraryCache.registerTask(jobId, executionId, requiredJarFiles);
+ libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
LOG.debug("Register task {} at library cache manager took {} milliseconds",
executionId, System.currentTimeMillis() - startDownloadTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bbb382e..7bb7783 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -618,7 +618,8 @@ class JobManager(
// because this makes sure that the uploaded jar files are removed in case of
// unsuccessful
try {
- libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
+ libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
+ jobGraph.getClasspaths)
}
catch {
case t: Throwable =>
@@ -657,6 +658,7 @@ class JobManager(
jobGraph.getJobConfiguration,
timeout,
jobGraph.getUserJarBlobKeys,
+ jobGraph.getClasspaths,
userCodeLoader)
val jobInfo = JobInfo(
client,
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 58755f3..2dd1caf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -51,10 +52,11 @@ public class TaskDeploymentDescriptorTest {
final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
+ final List<URL> requiredClasspaths = new ArrayList<URL>(0);
final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass.getName(), producedResults, inputGates, requiredJars, 47);
+ invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47);
final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
@@ -73,6 +75,7 @@ public class TaskDeploymentDescriptorTest {
assertEquals(orig.getInputGates(), copy.getInputGates());
assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
+ assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths());
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 4e814f5..5a5ef57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -36,6 +36,7 @@ import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,7 +69,7 @@ public class BlobLibraryCacheManagerTest {
long cleanupInterval = 1000l;
libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
- libraryCacheManager.registerJob(jid, keys);
+ libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
List<File> files = new ArrayList<File>();
@@ -170,7 +171,7 @@ public class BlobLibraryCacheManagerTest {
ExecutionAttemptID executionId = new ExecutionAttemptID();
Collection<BlobKey> keys = Collections.singleton(dataKey1);
- libCache.registerTask(jid, executionId, keys);
+ libCache.registerTask(jid, executionId, keys, Collections.<URL>emptyList());
assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
assertEquals(1, libCache.getNumberOfCachedLibraries());
assertNotNull(libCache.getClassLoader(jid));
@@ -201,7 +202,8 @@ public class BlobLibraryCacheManagerTest {
// since we cannot download this library any more, this call should fail
try {
- libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2));
+ libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2),
+ Collections.<URL>emptyList());
fail("This should fail with an IOException");
}
catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ee0c1f5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index c0fe750..fff6e70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -47,6 +47,7 @@ import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
+import java.net.URL;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@@ -153,6 +154,7 @@ public class TaskAsyncCallTest {
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
0);
ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;