You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/12/13 10:26:39 UTC

flink git commit: [FLINK-5071][yarn] adjust vcore validation check

Repository: flink
Updated Branches:
  refs/heads/release-1.1 f3d0cc3c1 -> c0d5f1dc9


[FLINK-5071][yarn] adjust vcore validation check

The check didn't take the virtual core settings configured in the Flink
configuration into account.

- improve error reporting
- add test case

This closes #2839.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0d5f1dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0d5f1dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0d5f1dc

Branch: refs/heads/release-1.1
Commit: c0d5f1dc992b540a1d92e2656439fa25b830f6f3
Parents: f3d0cc3
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Nov 21 12:46:51 2016 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Dec 12 14:46:02 2016 +0100

----------------------------------------------------------------------
 .../yarn/AbstractYarnClusterDescriptor.java     | 13 +++++++---
 .../flink/yarn/YarnClusterDescriptorTest.java   | 25 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0d5f1dc/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ab1fbc1..44417fb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -262,12 +262,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			throw new YarnDeploymentException("Flink configuration object has not been set");
 		}
 
+		// Check if we don't exceed YARN's maximum virtual cores.
+		// The number of cores can be configured in the config.
+		// If not configured, it is set to the number of task slots
 		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+		int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, slots);
 		// don't configure more than the maximum configured number of vcores
-		if (slots > numYarnVcores) {
+		if (configuredVcores > numYarnVcores) {
 			throw new IllegalConfigurationException(
-				String.format("The number of task slots per node was configured with %d" +
-					" but Yarn only has %d virtual cores available.", slots, numYarnVcores));
+				String.format("The number of virtual cores per node were configured with %d" +
+						" but Yarn only has %d virtual cores available. Please note that the number" +
+						" of virtual cores is set to the number of task slots by default unless configured" +
+						" in the Flink config with '%s.'",
+					configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
 		}
 
 		// check if required Hadoop environment variables are set. If not, warn user

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d5f1dc/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 17ce7be..9838e6d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.hadoop.fs.Path;
@@ -47,7 +48,6 @@ public class YarnClusterDescriptorTest {
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() {
 
-
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
@@ -65,5 +65,28 @@ public class YarnClusterDescriptorTest {
 		}
 	}
 
+	@Test
+	public void testConfigOverwrite() {
+
+		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
+
+		Configuration configuration = new Configuration();
+		// overwrite vcores in config
+		configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+
+		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
+		clusterDescriptor.setFlinkConfiguration(configuration);
+		clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
+		clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
+
+		// configure slots
+		clusterDescriptor.setTaskManagerSlots(1);
 
+		try {
+			clusterDescriptor.deploy();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.assertTrue(e.getCause() instanceof IllegalConfigurationException);
+		}
+	}
 }