You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/21 12:19:27 UTC

flink git commit: [FLINK-4079] do not load Yarn properties file for per-job Yarn clusters

Repository: flink
Updated Branches:
  refs/heads/release-1.0 e7d6952f1 -> 3163638a6


[FLINK-4079] do not load Yarn properties file for per-job Yarn clusters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3163638a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3163638a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3163638a

Branch: refs/heads/release-1.0
Commit: 3163638a61f49dd2df55467615096918bec3a890
Parents: e7d6952
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jun 21 11:03:25 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Jun 21 14:20:20 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 126 ++++++++++---------
 1 file changed, 67 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3163638a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 508b2b8..5ac8c5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -173,65 +173,6 @@ public class CliFrontend {
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 		this.config = GlobalConfiguration.getConfiguration();
 
-		// load the YARN properties
-		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
-		String currentUser = System.getProperty("user.name");
-		String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
-
-		File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
-		if (propertiesFile.exists()) {
-
-			logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
-
-			Properties yarnProperties = new Properties();
-			try {
-				try (InputStream is = new FileInputStream(propertiesFile)) {
-					yarnProperties.load(is);
-				}
-			}
-			catch (IOException e) {
-				throw new Exception("Cannot read the YARN properties file", e);
-			}
-
-			// configure the default parallelism from YARN
-			String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
-			if (propParallelism != null) { // maybe the property is not set
-				try {
-					int parallelism = Integer.parseInt(propParallelism);
-					this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
-
-					logAndSysout("YARN properties set default parallelism to " + parallelism);
-				}
-				catch (NumberFormatException e) {
-					throw new Exception("Error while parsing the YARN properties: " +
-							"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
-				}
-			}
-
-			// get the JobManager address from the YARN properties
-			String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-			InetSocketAddress jobManagerAddress;
-			if (address != null) {
-				try {
-					jobManagerAddress = parseHostPortAddress(address);
-					// store address in config from where it is retrieved by the retrieval service
-					writeJobManagerAddressToConfig(jobManagerAddress);
-				}
-				catch (Exception e) {
-					throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
-				}
-
-				logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
-			}
-
-			// handle the YARN client's dynamic properties
-			String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-			Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
-			for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
-				this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
-			}
-		}
-
 		try {
 			FileSystem.setDefaultScheme(config);
 		} catch (IOException e) {
@@ -944,6 +885,13 @@ public class CliFrontend {
 	 * @param options Command line options
 	 */
 	protected void updateConfig(CommandLineOptions options) {
+
+		try {
+			loadYarnProperties();
+		} catch (Exception e) {
+			LOG.error("Couldn't load Yarn properties file", e);
+		}
+
 		if(options.getJobManagerAddress() != null){
 			InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
 			writeJobManagerAddressToConfig(jobManagerAddress);
@@ -1082,6 +1030,13 @@ public class CliFrontend {
 			}
 		}
 		else {
+
+			try {
+				loadYarnProperties();
+			} catch (Exception e) {
+				LOG.error("Couldn't load Yarn properties file", e);
+			}
+
 			if(options.getJobManagerAddress() != null) {
 				jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
 				writeJobManagerAddressToConfig(jobManagerAddress);
@@ -1323,4 +1278,57 @@ public class CliFrontend {
 			return Collections.emptyMap();
 		}
 	}
+
+	private void loadYarnProperties() throws Exception {
+
+		// load the YARN properties
+		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
+		String currentUser = System.getProperty("user.name");
+		String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+		File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
+		if (propertiesFile.exists()) {
+
+			logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
+
+			Properties yarnProperties = new Properties();
+			try (InputStream is = new FileInputStream(propertiesFile)) {
+				yarnProperties.load(is);
+			}
+
+			// configure the default parallelism from YARN
+			String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+			if (propParallelism != null) { // maybe the property is not set
+				try {
+					int parallelism = Integer.parseInt(propParallelism);
+					this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
+
+					logAndSysout("YARN properties set default parallelism to " + parallelism);
+				}
+				catch (NumberFormatException e) {
+					throw new IOException("Error while parsing the YARN properties: " +
+						"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+				}
+			}
+
+			// get the JobManager address from the YARN properties
+			String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+			InetSocketAddress jobManagerAddress;
+			if (address != null) {
+				jobManagerAddress = parseHostPortAddress(address);
+				// store address in config from where it is retrieved by the retrieval service
+				writeJobManagerAddressToConfig(jobManagerAddress);
+
+				logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
+			}
+
+			// handle the YARN client's dynamic properties
+			String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+			Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+			for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
+				this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
+			}
+		}
+
+	}
 }