You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/21 12:19:27 UTC
flink git commit: [FLINK-4079] do not load Yarn properties file for
per-job Yarn clusters
Repository: flink
Updated Branches:
refs/heads/release-1.0 e7d6952f1 -> 3163638a6
[FLINK-4079] do not load Yarn properties file for per-job Yarn clusters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3163638a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3163638a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3163638a
Branch: refs/heads/release-1.0
Commit: 3163638a61f49dd2df55467615096918bec3a890
Parents: e7d6952
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jun 21 11:03:25 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Jun 21 14:20:20 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 126 ++++++++++---------
1 file changed, 67 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3163638a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 508b2b8..5ac8c5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -173,65 +173,6 @@ public class CliFrontend {
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
- // load the YARN properties
- String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
- String currentUser = System.getProperty("user.name");
- String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
-
- File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
- if (propertiesFile.exists()) {
-
- logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
-
- Properties yarnProperties = new Properties();
- try {
- try (InputStream is = new FileInputStream(propertiesFile)) {
- yarnProperties.load(is);
- }
- }
- catch (IOException e) {
- throw new Exception("Cannot read the YARN properties file", e);
- }
-
- // configure the default parallelism from YARN
- String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
- if (propParallelism != null) { // maybe the property is not set
- try {
- int parallelism = Integer.parseInt(propParallelism);
- this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
-
- logAndSysout("YARN properties set default parallelism to " + parallelism);
- }
- catch (NumberFormatException e) {
- throw new Exception("Error while parsing the YARN properties: " +
- "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
- }
- }
-
- // get the JobManager address from the YARN properties
- String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
- InetSocketAddress jobManagerAddress;
- if (address != null) {
- try {
- jobManagerAddress = parseHostPortAddress(address);
- // store address in config from where it is retrieved by the retrieval service
- writeJobManagerAddressToConfig(jobManagerAddress);
- }
- catch (Exception e) {
- throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
- }
-
- logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
- }
-
- // handle the YARN client's dynamic properties
- String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
- Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
- for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
- this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
- }
- }
-
try {
FileSystem.setDefaultScheme(config);
} catch (IOException e) {
@@ -944,6 +885,13 @@ public class CliFrontend {
* @param options Command line options
*/
protected void updateConfig(CommandLineOptions options) {
+
+ try {
+ loadYarnProperties();
+ } catch (Exception e) {
+ LOG.error("Couldn't load Yarn properties file", e);
+ }
+
if(options.getJobManagerAddress() != null){
InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
@@ -1082,6 +1030,13 @@ public class CliFrontend {
}
}
else {
+
+ try {
+ loadYarnProperties();
+ } catch (Exception e) {
+ LOG.error("Couldn't load Yarn properties file", e);
+ }
+
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
@@ -1323,4 +1278,57 @@ public class CliFrontend {
return Collections.emptyMap();
}
}
+
+ private void loadYarnProperties() throws Exception {
+
+ // load the YARN properties
+ String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
+ String currentUser = System.getProperty("user.name");
+ String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+ File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
+ if (propertiesFile.exists()) {
+
+ logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
+
+ Properties yarnProperties = new Properties();
+ try (InputStream is = new FileInputStream(propertiesFile)) {
+ yarnProperties.load(is);
+ }
+
+ // configure the default parallelism from YARN
+ String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+ if (propParallelism != null) { // maybe the property is not set
+ try {
+ int parallelism = Integer.parseInt(propParallelism);
+ this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
+
+ logAndSysout("YARN properties set default parallelism to " + parallelism);
+ }
+ catch (NumberFormatException e) {
+ throw new IOException("Error while parsing the YARN properties: " +
+ "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+ }
+ }
+
+ // get the JobManager address from the YARN properties
+ String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+ InetSocketAddress jobManagerAddress;
+ if (address != null) {
+ jobManagerAddress = parseHostPortAddress(address);
+ // store address in config from where it is retrieved by the retrieval service
+ writeJobManagerAddressToConfig(jobManagerAddress);
+
+ logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
+ }
+
+ // handle the YARN client's dynamic properties
+ String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+ Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+ for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
+ this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
+ }
+ }
+
+ }
}