You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/28 10:17:38 UTC
flink git commit: [FLINK-7125] [yarn] Remove Configuration loading
from AbstractYarnClusterDescriptor
Repository: flink
Updated Branches:
refs/heads/master fa16e9a81 -> e97514090
[FLINK-7125] [yarn] Remove Configuration loading from AbstractYarnClusterDescriptor
Instead the AbstractYarnClusterDescriptor is passed in a Configuration instance which
is sent to the started application master.
Pass in configuration directory manually
Remove configurationDirectory resolution from AbstractYarnClusterDescriptor
Address PR comments
This closes #4280.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9751409
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9751409
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9751409
Branch: refs/heads/master
Commit: e9751409033870658a12a8edb4cf11a84fd04e3d
Parents: fa16e9a
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jul 7 13:40:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 28 12:16:58 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 17 ++++-
.../flink/client/cli/CustomCommandLine.java | 19 +++---
.../org/apache/flink/client/cli/DefaultCLI.java | 6 +-
.../org/apache/flink/api/scala/FlinkShell.scala | 6 +-
...CliFrontendYarnAddressConfigurationTest.java | 10 ++-
.../flink/yarn/FlinkYarnSessionCliTest.java | 48 ++++++++-----
.../yarn/TestingYarnClusterDescriptor.java | 4 +-
.../flink/yarn/YARNHighAvailabilityITCase.java | 11 ++-
.../flink/yarn/YARNSessionFIFOITCase.java | 9 ++-
.../flink/yarn/YarnClusterDescriptorTest.java | 15 ++--
.../org/apache/flink/yarn/YarnTestBase.java | 35 ++++++++--
.../yarn/AbstractYarnClusterDescriptor.java | 72 ++++++++------------
.../flink/yarn/YarnClusterDescriptor.java | 5 ++
.../flink/yarn/YarnClusterDescriptorV2.java | 5 ++
.../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 15 ++--
.../flink/yarn/cli/FlinkYarnSessionCli.java | 51 +++++++++-----
.../flink/yarn/YarnClusterDescriptorTest.java | 32 ++++-----
17 files changed, 221 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 4c7f6a8..2e140df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -152,6 +152,8 @@ public class CliFrontend {
private final Configuration config;
+ private final String configurationDirectory;
+
private final FiniteDuration clientTimeout;
private final int defaultParallelism;
@@ -165,6 +167,7 @@ public class CliFrontend {
}
public CliFrontend(String configDir) throws Exception {
+ configurationDirectory = Preconditions.checkNotNull(configDir);
// configure the config directory
File configDirectory = new File(configDir);
@@ -204,6 +207,15 @@ public class CliFrontend {
return copiedConfiguration;
}
+ /**
+ * Returns the configuration directory for the CLI frontend.
+ *
+ * @return Configuration directory
+ */
+ public String getConfigurationDirectory() {
+ return configurationDirectory;
+ }
+
// --------------------------------------------------------------------------------------------
// Execute Actions
// --------------------------------------------------------------------------------------------
@@ -906,7 +918,7 @@ public class CliFrontend {
protected ClusterClient retrieveClient(CommandLineOptions options) {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
- ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
+ ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
return client;
} catch (Exception e) {
@@ -943,7 +955,7 @@ public class CliFrontend {
ClusterClient client;
try {
- client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
+ client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) {
try {
@@ -952,6 +964,7 @@ public class CliFrontend {
applicationName,
options.getCommandLine(),
config,
+ configurationDirectory,
program.getAllLibraries());
logAndSysout("Cluster started: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e2) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index 9ddaf9e..517fc71 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -62,25 +62,28 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
* Retrieves a client for a running cluster.
* @param commandLine The command-line parameters from the CliFrontend
* @param config The Flink config
+ * @param configurationDirectory Directory for configuration files
* @return Client if a cluster could be retrieved
* @throws UnsupportedOperationException if the operation is not supported
*/
ClusterType retrieveCluster(
- CommandLine commandLine,
- Configuration config) throws UnsupportedOperationException;
+ CommandLine commandLine,
+ Configuration config,
+ String configurationDirectory) throws UnsupportedOperationException;
/**
* Creates the client for the cluster.
* @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend
* @param config The Flink config to use
- * @param userJarFiles User jar files to include in the classpath of the cluster.
- * @return The client to communicate with the cluster which the CustomCommandLine brought up.
+ * @param configurationDirectory Directory for configuration files
+ *@param userJarFiles User jar files to include in the classpath of the cluster. @return The client to communicate with the cluster which the CustomCommandLine brought up.
* @throws Exception if the cluster could not be created
*/
ClusterType createCluster(
- String applicationName,
- CommandLine commandLine,
- Configuration config,
- List<URL> userJarFiles) throws Exception;
+ String applicationName,
+ CommandLine commandLine,
+ Configuration config,
+ String configurationDirectory,
+ List<URL> userJarFiles) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index ec55c91..a5d8a30 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -59,7 +59,10 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
}
@Override
- public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) {
+ public StandaloneClusterClient retrieveCluster(
+ CommandLine commandLine,
+ Configuration config,
+ String configurationDirectory) {
if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
@@ -81,6 +84,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
String applicationName,
CommandLine commandLine,
Configuration config,
+ String configurationDirectory,
List<URL> userJarFiles) throws UnsupportedOperationException {
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index a5c5860..60eaccc 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -257,6 +257,7 @@ object FlinkShell {
"Flink Scala Shell",
options.getCommandLine,
config,
+ frontend.getConfigurationDirectory,
Collections.emptyList())
val address = cluster.getJobManagerAddress.getAddress.getHostAddress
@@ -277,7 +278,10 @@ object FlinkShell {
val config = frontend.getConfiguration
val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
- val cluster = customCLI.retrieveCluster(options.getCommandLine, config)
+ val cluster = customCLI.retrieveCluster(
+ options.getCommandLine,
+ config,
+ frontend.getConfigurationDirectory)
if (cluster == null) {
throw new RuntimeException("Yarn Cluster could not be retrieved.")
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 6811511..0d57e20 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -362,8 +362,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
@Override
// override cluster descriptor to replace the YarnClient
- protected AbstractYarnClusterDescriptor getClusterDescriptor() {
- return new TestingYarnClusterDescriptor();
+ protected AbstractYarnClusterDescriptor getClusterDescriptor(
+ Configuration configuration,
+ String configurationDirecotry) {
+ return new TestingYarnClusterDescriptor(configuration, configurationDirecotry);
}
/**
@@ -371,6 +373,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
*/
private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
+ public TestingYarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+ super(flinkConfiguration, configurationDirectory);
+ }
+
@Override
protected YarnClient getYarnClient() {
return new TestYarnClient();
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index f6c8bbb..c7c25ff 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -43,7 +42,6 @@ import org.mockito.Mockito;
import java.io.File;
import java.net.InetSocketAddress;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -57,8 +55,6 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testDynamicProperties() throws Exception {
- Map<String, String> map = new HashMap<String, String>(System.getenv());
- TestBaseUtils.setEnv(map);
FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
"",
"",
@@ -71,7 +67,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15",
"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"});
- AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
+ AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ null,
+ cmd);
Assert.assertNotNull(flinkYarnDescriptor);
@@ -84,10 +84,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testNotEnoughTaskSlots() throws Exception {
-
- File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
- new CliFrontend(tmp.getRoot().getAbsolutePath());
String[] params =
new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()};
@@ -108,7 +105,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
- new CliFrontend(tmp.getRoot().getAbsolutePath());
+ CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
+
+ final Configuration config = cliFrontend.getConfiguration();
String[] params =
new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()};
@@ -117,14 +116,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
- AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
- final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
+ final Configuration configuration = new Configuration();
+
+ AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "",
+ runOptions.getCommandLine());
+
+ final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(
+ configuration,
+ runOptions.getCommandLine());
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
- Configuration config = new Configuration();
CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
ClusterClient client = new TestingYarnClusterClient(
descriptor,
@@ -139,7 +146,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
- new CliFrontend(tmp.getRoot().getAbsolutePath());
+ CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
+ final Configuration configuration = cliFrontend.getConfiguration();
String zkNamespaceCliInput = "flink_test_namespace";
@@ -149,7 +157,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
- AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+ AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "",
+ runOptions.getCommandLine());
Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}
@@ -161,6 +173,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
}
private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
+ public JarAgnosticClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+ super(flinkConfiguration, configurationDirectory);
+ }
+
@Override
public void setLocalJarPath(Path localJarPath) {
// add nothing
@@ -168,8 +184,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
}
@Override
- protected AbstractYarnClusterDescriptor getClusterDescriptor() {
- return new JarAgnosticClusterDescriptor();
+ protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) {
+ return new JarAgnosticClusterDescriptor(configuration, configurationDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index cde13ff..b2fe133 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,8 @@ import java.util.List;
*/
public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
- public TestingYarnClusterDescriptor() {
+ public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
+ super(configuration, configurationDirectory);
List<File> filesToShip = new ArrayList<>();
File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 308cdc1..897dc8c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -104,29 +105,25 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
@Test
public void testMultipleAMKill() throws Exception {
final int numberKillingAttempts = numberApplicationAttempts - 1;
-
- TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor();
+ String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ final Configuration configuration = GlobalConfiguration.loadConfiguration();
+ TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(configuration, confDirPath);
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
- flinkYarnClient.setConfigurationDirectory(confDirPath);
-
String fsStateHandlePath = temp.getRoot().getPath();
// load the configuration
File configDirectory = new File(confDirPath);
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
- flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
- flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
ClusterClient yarnCluster = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 2bbaadf..dd56f2f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -224,14 +225,12 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
final int waitTime = 15;
LOG.info("Starting testJavaAPI()");
- AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
+ String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ Configuration configuration = GlobalConfiguration.loadConfiguration();
+ AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(configuration, confDirPath);
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
- flinkYarnClient.setConfigurationDirectory(confDirPath);
- flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
- flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index c40a300..f3e48c5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
@@ -39,7 +40,7 @@ import java.util.Set;
/**
* Tests for the YarnClusterDescriptor.
*/
-public class YarnClusterDescriptorTest {
+public class YarnClusterDescriptorTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -49,13 +50,9 @@ public class YarnClusterDescriptorTest {
*/
@Test
public void testExplicitLibShipping() throws Exception {
- AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
+ AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
- descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
- descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
- descriptor.setFlinkConfiguration(new Configuration());
-
File libFile = temporaryFolder.newFile("libFile.jar");
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
@@ -86,11 +83,7 @@ public class YarnClusterDescriptorTest {
*/
@Test
public void testEnvironmentLibShipping() throws Exception {
- AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
-
- descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
- descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
- descriptor.setFlinkConfiguration(new Configuration());
+ AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
File libFile = new File(libFolder, "libFile.jar");
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index e701104..62ff955 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -136,8 +137,6 @@ public abstract class YarnTestBase extends TestLogger {
*/
protected static File tempConfPathForSecureRun = null;
- protected static org.apache.flink.configuration.Configuration flinkConfiguration = new org.apache.flink.configuration.Configuration();
-
static {
YARN_CONFIGURATION = new YarnConfiguration();
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -185,6 +184,8 @@ public abstract class YarnTestBase extends TestLogger {
}
private YarnClient yarnClient = null;
+ protected org.apache.flink.configuration.Configuration flinkConfiguration;
+
@Before
public void checkClusterEmpty() throws IOException, YarnException {
if (yarnClient == null) {
@@ -202,6 +203,8 @@ public abstract class YarnTestBase extends TestLogger {
"App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
}
}
+
+ flinkConfiguration = new org.apache.flink.configuration.Configuration();
}
/**
@@ -498,7 +501,12 @@ public abstract class YarnTestBase extends TestLogger {
final int startTimeoutSeconds = 60;
- Runner runner = new Runner(args, type, 0);
+ Runner runner = new Runner(
+ args,
+ flinkConfiguration,
+ CliFrontend.getConfigurationDirectoryFromEnv(),
+ type,
+ 0);
runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
runner.start();
@@ -551,7 +559,12 @@ public abstract class YarnTestBase extends TestLogger {
final int startTimeoutSeconds = 180;
final long deadline = System.currentTimeMillis() + (startTimeoutSeconds * 1000);
- Runner runner = new Runner(args, type, expectedReturnValue);
+ Runner runner = new Runner(
+ args,
+ flinkConfiguration,
+ CliFrontend.getConfigurationDirectoryFromEnv(),
+ type,
+ expectedReturnValue);
runner.start();
boolean expectedStringSeen = false;
@@ -632,14 +645,24 @@ public abstract class YarnTestBase extends TestLogger {
*/
protected static class Runner extends Thread {
private final String[] args;
+ private final org.apache.flink.configuration.Configuration configuration;
+ private final String configurationDirectory;
private final int expectedReturnValue;
private RunTypes type;
private FlinkYarnSessionCli yCli;
private Throwable runnerError;
- public Runner(String[] args, RunTypes type, int expectedReturnValue) {
+ public Runner(
+ String[] args,
+ org.apache.flink.configuration.Configuration configuration,
+ String configurationDirectory,
+ RunTypes type,
+ int expectedReturnValue) {
+
this.args = args;
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
this.type = type;
this.expectedReturnValue = expectedReturnValue;
}
@@ -654,7 +677,7 @@ public abstract class YarnTestBase extends TestLogger {
"",
"",
false);
- returnValue = yCli.run(args);
+ returnValue = yCli.run(args, configuration, configurationDirectory);
break;
case CLI_FRONTEND:
TestingCLI cli;
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 51b7600..908e91e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -18,12 +18,10 @@
package org.apache.flink.yarn;
-import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
@@ -119,8 +117,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private String configurationDirectory;
- private Path flinkConfigurationPath;
-
private Path flinkJarPath;
private String dynamicPropertiesEncoded;
@@ -128,7 +124,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
/** Lazily initialized list of files to ship. */
protected List<File> shipFiles = new LinkedList<>();
- private org.apache.flink.configuration.Configuration flinkConfiguration;
+ private final org.apache.flink.configuration.Configuration flinkConfiguration;
private boolean detached;
@@ -142,7 +138,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private YarnConfigOptions.UserJarInclusion userJarInclusion;
- public AbstractYarnClusterDescriptor() {
+ public AbstractYarnClusterDescriptor(
+ org.apache.flink.configuration.Configuration flinkConfiguration,
+ String configurationDirectory) {
// for unit tests only
if (System.getenv("IN_TESTS") != null) {
try {
@@ -152,21 +150,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- // tries to load the config through the environment, if it fails it can still be set through the setters
- try {
- this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
- this.flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);
-
- File confFile = new File(configurationDirectory + File.separator + GlobalConfiguration.FLINK_CONF_FILENAME);
- if (!confFile.exists()) {
- throw new RuntimeException("Unable to locate configuration file in " + confFile);
- }
- flinkConfigurationPath = new Path(confFile.getAbsolutePath());
+ this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
+ userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
- userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
- } catch (Exception e) {
- LOG.debug("Config couldn't be loaded from environment variable.", e);
- }
+ this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
}
/**
@@ -174,12 +161,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
*/
protected abstract Class<?> getApplicationMasterClass();
- public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
- this.flinkConfiguration = conf;
-
- userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
- }
-
public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
return flinkConfiguration;
}
@@ -195,14 +176,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this.flinkJarPath = localJarPath;
}
- public void setConfigurationFilePath(Path confPath) {
- flinkConfigurationPath = confPath;
- }
-
- public void setConfigurationDirectory(String configurationDirectory) {
- this.configurationDirectory = configurationDirectory;
- }
-
public void addShipFiles(List<File> shipFiles) {
for (File shipFile: shipFiles) {
// remove uberjar from ship list (by default everything in the lib/ folder is added to
@@ -268,9 +241,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if (this.configurationDirectory == null) {
throw new YarnDeploymentException("Configuration directory not set");
}
- if (this.flinkConfigurationPath == null) {
- throw new YarnDeploymentException("Configuration path not set");
- }
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
@@ -734,12 +704,30 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- Path remotePathJar =
- Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
- Path remotePathConf =
- Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+ Path remotePathJar = Utils.setupLocalResource(
+ fs,
+ appId.toString(),
+ flinkJarPath,
+ appMasterJar,
+ fs.getHomeDirectory());
+
localResources.put("flink.jar", appMasterJar);
+
+ // Upload the flink configuration
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+ // write out configuration file
+ File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
+ tmpConfigurationFile.deleteOnExit();
+ BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile);
+
+ Path remotePathConf = Utils.setupLocalResource(
+ fs,
+ appId.toString(),
+ new Path(tmpConfigurationFile.getAbsolutePath()),
+ flinkConf,
+ fs.getHomeDirectory());
+
localResources.put("flink-conf.yaml", flinkConf);
paths.add(remotePathJar);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index dae0f64..b518f9e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
/**
@@ -25,6 +26,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
*/
public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+ public YarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+ super(flinkConfiguration, configurationDirectory);
+ }
+
@Override
protected Class<?> getApplicationMasterClass() {
return YarnApplicationMasterRunner.class;
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 9807273..6a43374 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
/**
@@ -28,6 +29,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
*/
public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
+ public YarnClusterDescriptorV2(Configuration flinkConfiguration, String configurationDirectory) {
+ super(flinkConfiguration, configurationDirectory);
+ }
+
@Override
protected Class<?> getApplicationMasterClass() {
return YarnFlinkApplicationMasterRunner.class;
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index af49fa8..62ba207 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn.cli;
+import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.Configuration;
@@ -96,9 +97,12 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
allOptions.addOption(zookeeperNamespace);
}
- public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) {
+ public YarnClusterDescriptorV2 createDescriptor(
+ Configuration configuration,
+ String defaultApplicationName,
+ CommandLine cmd) {
- YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2();
+ YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2(configuration, CliFrontend.getConfigurationDirectoryFromEnv());
// Jar Path
Path localJarPath;
@@ -209,7 +213,8 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
@Override
public YarnClusterClientV2 retrieveCluster(
CommandLine cmdLine,
- Configuration config) throws UnsupportedOperationException {
+ Configuration config,
+ String configurationDirectory) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
}
@@ -219,11 +224,11 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
String applicationName,
CommandLine cmdLine,
Configuration config,
+ String configurationDirectory,
List<URL> userJarFiles) throws Exception {
Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
- YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
- yarnClusterDescriptor.setFlinkConfiguration(config);
+ YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(config, applicationName, cmdLine);
yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
return new YarnClusterClientV2(
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a887e54..4a214e2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn.cli;
+import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.deployment.ClusterSpecification;
@@ -257,9 +258,15 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return applicationID;
}
- public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
+ public AbstractYarnClusterDescriptor createDescriptor(
+ Configuration configuration,
+ String configurationDirectory,
+ String defaultApplicationName,
+ CommandLine cmd) {
- AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
+ AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
+ configuration,
+ configurationDirectory);
// Jar Path
Path localJarPath;
@@ -488,13 +495,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
public static void main(final String[] args) throws Exception {
- Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
+
+ final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
+
+ final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfiguration));
int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
public Integer call() {
- return cli.run(args);
+ return cli.run(args, flinkConfiguration, configurationDirectory);
}
});
System.exit(retCode);
@@ -528,7 +538,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
@Override
public YarnClusterClient retrieveCluster(
CommandLine cmdLine,
- Configuration config) throws UnsupportedOperationException {
+ Configuration config,
+ String configurationDirectory) throws UnsupportedOperationException {
// first check for an application id, then try to load from yarn properties
String applicationID = cmdLine.hasOption(applicationId.getOpt()) ?
@@ -541,8 +552,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
: config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
- AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
- yarnDescriptor.setFlinkConfiguration(config);
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(config, configurationDirectory);
return yarnDescriptor.retrieve(applicationID);
} else {
throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
@@ -554,12 +564,18 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
String applicationName,
CommandLine cmdLine,
Configuration config,
+ String configurationDirectory,
List<URL> userJarFiles) {
Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
- AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
- ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
- yarnClusterDescriptor.setFlinkConfiguration(config);
+ AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(
+ config,
+ configurationDirectory,
+ applicationName,
+ cmdLine);
+
+ final ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
+
yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
try {
@@ -570,7 +586,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
- public int run(String[] args) {
+ public int run(
+ String[] args,
+ Configuration configuration,
+ String configurationDirectory) {
//
// Command Line Options
//
@@ -590,7 +609,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// Query cluster for metrics
if (cmd.hasOption(query.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
String description;
try {
description = yarnDescriptor.getClusterDescription();
@@ -603,7 +622,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return 0;
} else if (cmd.hasOption(applicationId.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+ AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
//configure ZK namespace depending on the value passed
String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
@@ -631,7 +650,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
AbstractYarnClusterDescriptor yarnDescriptor;
try {
- yarnDescriptor = createDescriptor(null, cmd);
+ yarnDescriptor = createDescriptor(configuration, configurationDirectory, null, cmd);
} catch (Exception e) {
System.err.println("Error while starting the YARN Client: " + e.getMessage());
e.printStackTrace(System.err);
@@ -745,7 +764,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
}
- protected AbstractYarnClusterDescriptor getClusterDescriptor() {
- return new YarnClusterDescriptor();
+ protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) {
+ return new YarnClusterDescriptor(configuration, configurationDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index a862b50..86122d6 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -48,24 +48,21 @@ public class YarnClusterDescriptorTest extends TestLogger {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File flinkJar;
- private File flinkConf;
@Before
public void beforeTest() throws IOException {
temporaryFolder.create();
flinkJar = temporaryFolder.newFile("flink.jar");
- flinkConf = temporaryFolder.newFile("flink-conf.yaml");
}
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() {
- YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
+ YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+ new Configuration(),
+ temporaryFolder.getRoot().getAbsolutePath());
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
- clusterDescriptor.setFlinkConfiguration(new Configuration());
- clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
- clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(-1)
@@ -88,17 +85,15 @@ public class YarnClusterDescriptorTest extends TestLogger {
@Test
public void testConfigOverwrite() {
-
- YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
-
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+ YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+ configuration,
+ temporaryFolder.getRoot().getAbsolutePath());
+
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
- clusterDescriptor.setFlinkConfiguration(configuration);
- clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
- clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
// configure slots
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
@@ -122,9 +117,10 @@ public class YarnClusterDescriptorTest extends TestLogger {
@Test
public void testSetupApplicationMasterContainer() {
- YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
- final Configuration cfg = new Configuration();
- clusterDescriptor.setFlinkConfiguration(cfg);
+ Configuration cfg = new Configuration();
+ YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+ cfg,
+ temporaryFolder.getRoot().getAbsolutePath());
final String java = "$JAVA_HOME/bin/java";
final String jvmmem = "-Xmx424m";
@@ -254,6 +250,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
.getCommands().get(0));
// logback + log4j, with/out krb5, different JVM opts
+ // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
+ // because we have a reference to the ClusterDescriptor's configuration which we modify continuously
cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertEquals(
java + " " + jvmmem +
@@ -282,6 +280,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
.getCommands().get(0));
// logback + log4j, with/out krb5, different JVM opts
+ // IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
assertEquals(
java + " " + jvmmem +
@@ -310,7 +309,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
.getCommands().get(0));
// now try some configurations with different yarn.container-start-command-template
-
+ // IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
assertEquals(
@@ -328,6 +327,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+ // IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
assertEquals(
java +
" " + logfile + " " + logback + " " + log4j +