You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/12/13 10:26:39 UTC
flink git commit: [FLINK-5071][yarn] adjust vcore validation check
Repository: flink
Updated Branches:
refs/heads/release-1.1 f3d0cc3c1 -> c0d5f1dc9
[FLINK-5071][yarn] adjust vcore validation check
The check didn't take the virtual core settings configured in the Flink
configuration into account.
- improve error reporting
- add test case
This closes #2839.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0d5f1dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0d5f1dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0d5f1dc
Branch: refs/heads/release-1.1
Commit: c0d5f1dc992b540a1d92e2656439fa25b830f6f3
Parents: f3d0cc3
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Nov 21 12:46:51 2016 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Dec 12 14:46:02 2016 +0100
----------------------------------------------------------------------
.../yarn/AbstractYarnClusterDescriptor.java | 13 +++++++---
.../flink/yarn/YarnClusterDescriptorTest.java | 25 +++++++++++++++++++-
2 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d5f1dc/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ab1fbc1..44417fb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -262,12 +262,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
throw new YarnDeploymentException("Flink configuration object has not been set");
}
+ // Check if we don't exceed YARN's maximum virtual cores.
+ // The number of cores can be configured in the config.
+ // If not configured, it is set to the number of task slots
int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+ int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, slots);
// don't configure more than the maximum configured number of vcores
- if (slots > numYarnVcores) {
+ if (configuredVcores > numYarnVcores) {
throw new IllegalConfigurationException(
- String.format("The number of task slots per node was configured with %d" +
- " but Yarn only has %d virtual cores available.", slots, numYarnVcores));
+ String.format("The number of virtual cores per node were configured with %d" +
+ " but Yarn only has %d virtual cores available. Please note that the number" +
+ " of virtual cores is set to the number of task slots by default unless configured" +
+ " in the Flink config with '%s.'",
+ configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
}
// check if required Hadoop environment variables are set. If not, warn user
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d5f1dc/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 17ce7be..9838e6d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.yarn;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.hadoop.fs.Path;
@@ -47,7 +48,6 @@ public class YarnClusterDescriptorTest {
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() {
-
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
@@ -65,5 +65,28 @@ public class YarnClusterDescriptorTest {
}
}
+ @Test
+ public void testConfigOverwrite() {
+
+ YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
+
+ Configuration configuration = new Configuration();
+ // overwrite vcores in config
+ configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+
+ clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
+ clusterDescriptor.setFlinkConfiguration(configuration);
+ clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
+ clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
+
+ // configure slots
+ clusterDescriptor.setTaskManagerSlots(1);
+ try {
+ clusterDescriptor.deploy();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getCause() instanceof IllegalConfigurationException);
+ }
+ }
}