You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/11/25 15:16:20 UTC
flink git commit: Revert "[FLINK-4913][yarn] include user jars in
system class loader"
Repository: flink
Updated Branches:
refs/heads/release-1.1 b9e6dcc3c -> 3b5d3c6f3
Revert "[FLINK-4913][yarn] include user jars in system class loader"
This reverts commit ea41b9c56fdc0af3c97d6dd48d04218db6176ec8.
This closes #2795
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5d3c6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5d3c6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5d3c6f
Branch: refs/heads/release-1.1
Commit: 3b5d3c6f359dbbdfebcf0b7c034264a3ed9ad12c
Parents: b9e6dcc
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sat Nov 12 20:49:17 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Nov 25 16:14:52 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 14 ++----
.../flink/client/cli/CustomCommandLine.java | 14 +-----
.../org/apache/flink/client/cli/DefaultCLI.java | 5 +-
.../flink/client/program/ClusterClient.java | 40 +++++-----------
.../flink/client/program/PackagedProgram.java | 46 ++++++-------------
.../client/program/StandaloneClusterClient.java | 6 ---
.../org/apache/flink/api/scala/FlinkShell.scala | 7 +--
...CliFrontendYarnAddressConfigurationTest.java | 5 +-
.../org/apache/flink/yarn/YarnTestBase.java | 5 +-
.../yarn/AbstractYarnClusterDescriptor.java | 48 +-------------------
.../apache/flink/yarn/YarnClusterClient.java | 6 ---
.../flink/yarn/cli/FlinkYarnSessionCli.java | 10 +---
12 files changed, 40 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3a322dc..69963fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -234,7 +234,7 @@ public class CliFrontend {
ClusterClient client = null;
try {
- client = createClient(options, program);
+ client = createClient(options, program.getMainClassName());
client.setPrintStatusDuringExecution(options.getStdoutLogging());
client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
@@ -871,12 +871,12 @@ public class CliFrontend {
/**
* Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options
- * @param program The program for which to create the client.
+ * @param programName Program name
* @throws Exception
*/
protected ClusterClient createClient(
CommandLineOptions options,
- PackagedProgram program) throws Exception {
+ String programName) throws Exception {
// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
@@ -887,12 +887,8 @@ public class CliFrontend {
logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) {
try {
- String applicationName = "Flink Application: " + program.getMainClassName();
- client = activeCommandLine.createCluster(
- applicationName,
- options.getCommandLine(),
- config,
- program.getAllLibraries());
+ String applicationName = "Flink Application: " + programName;
+ client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
logAndSysout("Cluster started: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException(
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index c58c74c..aecdc7c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -22,9 +22,6 @@ import org.apache.commons.cli.Options;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
-import java.net.URL;
-import java.util.List;
-
/**
* Custom command-line interface to load hooks for the command-line interface.
@@ -64,22 +61,15 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
* @return Client if a cluster could be retrieved
* @throws UnsupportedOperationException if the operation is not supported
*/
- ClusterType retrieveCluster(
- CommandLine commandLine,
- Configuration config) throws UnsupportedOperationException;
+ ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
/**
* Creates the client for the cluster
* @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend
* @param config The Flink config to use
- * @param userJarFiles User jar files to include in the classpath of the cluster.
* @return The client to communicate with the cluster which the CustomCommandLine brought up.
* @throws UnsupportedOperationException if the operation is not supported
*/
- ClusterType createCluster(
- String applicationName,
- CommandLine commandLine,
- Configuration config,
- List<URL> userJarFiles) throws UnsupportedOperationException;
+ ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 598c612..5f83c3d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -26,8 +26,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
@@ -77,8 +75,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
public StandaloneClusterClient createCluster(
String applicationName,
CommandLine commandLine,
- Configuration config,
- List<URL> userJarFiles) throws UnsupportedOperationException {
+ Configuration config) throws UnsupportedOperationException {
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
return descriptor.deploy();
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 5e88af6..2d743fa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -307,27 +307,11 @@ public abstract class ClusterClient {
{
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
-
- final JobWithJars jobWithJars;
- if (hasUserJarsInClassPath(prog.getAllLibraries())) {
- jobWithJars = prog.getPlanWithoutJars();
- } else {
- jobWithJars = prog.getPlanWithJars();
- }
-
- return run(jobWithJars, parallelism, prog.getSavepointSettings());
+ return run(prog.getPlanWithJars(), parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
-
- final List<URL> libraries;
- if (hasUserJarsInClassPath(prog.getAllLibraries())) {
- libraries = Collections.emptyList();
- } else {
- libraries = prog.getAllLibraries();
- }
-
- ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
+ ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);
@@ -365,7 +349,7 @@ public abstract class ClusterClient {
* Runs a program on the Flink cluster to which this client is connected. The call blocks until the
* execution is complete, and returns afterwards.
*
- * @param jobWithJars The program to be executed.
+ * @param program The program to be executed.
* @param parallelism The default parallelism to use when running the program. The default parallelism is used
* when the program does not set a parallelism by itself.
*
@@ -375,15 +359,15 @@ public abstract class ClusterClient {
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
- public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
+ public JobSubmissionResult run(JobWithJars program, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
- ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
+ ClassLoader classLoader = program.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
- OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
- return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
+ OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+ return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointSettings);
}
public JobSubmissionResult run(
@@ -614,6 +598,10 @@ public abstract class ClusterClient {
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
+ public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+ return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
+ }
+
public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
}
@@ -740,12 +728,6 @@ public abstract class ClusterClient {
public abstract int getMaxSlots();
/**
- * Returns true if the client already has the user jar and providing it again would
- * result in duplicate uploading of the jar.
- */
- public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
-
- /**
* Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
* some custom job submission logic.
* @param jobGraph The JobGraph to be submitted
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8931a3e..daa5737 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -283,38 +283,23 @@ public class PackagedProgram {
}
/**
- * Returns the plan without the required jars when the files are already provided by the cluster.
- *
- * @return The plan without attached jar files.
- * @throws ProgramInvocationException
- */
- public JobWithJars getPlanWithoutJars() throws ProgramInvocationException {
- if (isUsingProgramEntryPoint()) {
- return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
- } else {
- throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
- " for a program that is using the interactive mode.");
- }
- }
-
- /**
* Returns the plan with all required jars.
- *
+ *
* @return The plan with attached jar files.
- * @throws ProgramInvocationException
+ * @throws ProgramInvocationException
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException {
if (isUsingProgramEntryPoint()) {
return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
} else {
- throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
+ throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
}
}
/**
* Returns the analyzed plan without any optimizations.
- *
+ *
* @return
* the analyzed plan without any optimizations.
* @throws ProgramInvocationException Thrown if an error occurred in the
@@ -324,7 +309,7 @@ public class PackagedProgram {
public String getPreviewPlan() throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
List<DataSinkNode> previewPlan;
-
+
if (isUsingProgramEntryPoint()) {
previewPlan = Optimizer.createPreOptimizedPlan(getPlan());
}
@@ -351,7 +336,7 @@ public class PackagedProgram {
finally {
env.unsetAsContext();
}
-
+
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
} else {
@@ -375,7 +360,7 @@ public class PackagedProgram {
/**
* Returns the description provided by the Program class. This
* may contain a description of the plan itself and its arguments.
- *
+ *
* @return The description of the PactProgram's input parameters.
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
@@ -383,7 +368,7 @@ public class PackagedProgram {
*/
public String getDescription() throws ProgramInvocationException {
if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
-
+
ProgramDescription descr;
if (this.program != null) {
descr = (ProgramDescription) this.program;
@@ -395,22 +380,22 @@ public class PackagedProgram {
return null;
}
}
-
+
try {
return descr.getDescription();
}
catch (Throwable t) {
- throw new ProgramInvocationException("Error while getting the program description" +
+ throw new ProgramInvocationException("Error while getting the program description" +
(t.getMessage() == null ? "." : ": " + t.getMessage()), t);
}
-
+
} else {
return null;
}
}
-
+
/**
- *
+ *
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
*/
@@ -433,16 +418,13 @@ public class PackagedProgram {
/**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
- *
+ *
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
- /**
- * Returns all provided libraries needed to run the program.
- */
public List<URL> getAllLibraries() {
List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc9..3343b69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -28,7 +28,6 @@ import scala.concurrent.Await;
import scala.concurrent.Future;
import java.io.IOException;
-import java.net.URL;
import java.util.Collections;
import java.util.List;
@@ -88,11 +87,6 @@ public class StandaloneClusterClient extends ClusterClient {
}
@Override
- public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
- return false;
- }
-
- @Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
throws ProgramInvocationException {
if (isDetached()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index f00013e..fb70280 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,7 +19,6 @@
package org.apache.flink.api.scala
import java.io._
-import java.util.Collections
import org.apache.commons.cli.CommandLine
import org.apache.flink.client.cli.CliFrontendParser
@@ -253,11 +252,7 @@ object FlinkShell {
val config = frontend.getConfiguration
val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
- val cluster = customCLI.createCluster(
- "Flink Scala Shell",
- options.getCommandLine,
- config,
- Collections.emptyList())
+ val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
val address = cluster.getJobManagerAddress.getAddress.getHostAddress
val port = cluster.getJobManagerAddress.getPort
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 77d3149..8ba786f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -332,8 +331,8 @@ public class CliFrontendYarnAddressConfigurationTest {
@Override
// make method public
- public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
- return super.createClient(options, program);
+ public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
+ return super.createClient(options, programName);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 78e16ed..7e612c4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -668,9 +667,9 @@ public abstract class YarnTestBase extends TestLogger {
public TestingCLI() throws Exception {}
@Override
- protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception {
+ protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
// mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on
- originalClusterClient = super.createClient(options, program);
+ originalClusterClient = super.createClient(options, programName);
spiedClusterClient = Mockito.spy(originalClusterClient);
Mockito.doNothing().when(spiedClusterClient).shutdown();
return spiedClusterClient;
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 000b2c1..ab1fbc1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -61,8 +61,6 @@ import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
-import java.net.URISyntaxException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -132,10 +130,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private String zookeeperNamespace;
- /** Optional Jar file to include in the system class loader of all application nodes
- * (for per-job submission) */
- private Set<File> userJarFiles;
-
public AbstractYarnClusterDescriptor() {
// for unit tests only
if(System.getenv("IN_TESTS") != null) {
@@ -246,41 +240,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
}
- /**
- * Returns true if the descriptor has the job jars to include in the classpath.
- */
- public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
- if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) {
- return false;
- }
- try {
- for(URL jarFile : requiredJarFiles) {
- if (!userJarFiles.contains(new File(jarFile.toURI()))) {
- return false;
- }
- }
- } catch (URISyntaxException e) {
- return false;
- }
- return true;
- }
-
- /**
- * Sets the user jar which is included in the system classloader of all nodes.
- */
- public void setProvidedUserJarFiles(List<URL> userJarFiles) {
- Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size());
- for (URL jarFile : userJarFiles) {
- try {
- localUserJarFiles.add(new File(jarFile.toURI()));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile
- + " Currently only file:/// URLs are supported.");
- }
- }
- this.userJarFiles = localUserJarFiles;
- }
-
public String getDynamicPropertiesEncoded() {
return this.dynamicPropertiesEncoded;
}
@@ -596,11 +555,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j);
- // add the user jar to the classpath of the to-be-created cluster
- if (userJarFiles != null) {
- effectiveShipFiles.addAll(userJarFiles);
- }
-
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
@@ -755,7 +709,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
- throw new YarnDeploymentException("Failed to deploy the cluster.", e);
+ throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
}
YarnApplicationState appState = report.getYarnApplicationState();
switch(appState) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index cd447d7..79501b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -55,7 +55,6 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,11 +194,6 @@ public class YarnClusterClient extends ClusterClient {
}
@Override
- public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
- return clusterDescriptor.hasUserJarFiles(userJarFiles);
- }
-
- @Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
if (newlyCreatedCluster) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4823d35..28d8fb8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,7 +29,6 @@ import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
@@ -49,7 +48,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
-import java.net.URL;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -517,16 +515,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
@Override
- public YarnClusterClient createCluster(
- String applicationName,
- CommandLine cmdLine,
- Configuration config,
- List<URL> userJarFiles) {
- Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
+ public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) {
AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
yarnClusterDescriptor.setFlinkConfiguration(config);
- yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
try {
return yarnClusterDescriptor.deploy();