You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/18 06:36:13 UTC
flink git commit: [FLINK-9777] YARN: JM and TM Memory must be
specified with Units
Repository: flink
Updated Branches:
refs/heads/master 056486a1b -> 4095a310a
[FLINK-9777] YARN: JM and TM Memory must be specified with Units
This closes #6297
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4095a310
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4095a310
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4095a310
Branch: refs/heads/master
Commit: 4095a310a39e2cbf79228e2a84eac19a61f4c9d6
Parents: 056486a
Author: yanghua <ya...@gmail.com>
Authored: Tue Jul 10 23:16:12 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jul 18 08:33:44 2018 +0200
----------------------------------------------------------------------
docs/dev/scala_shell.md | 4 +-
docs/ops/cli.md | 8 +-
docs/ops/deployment/yarn_setup.md | 8 +-
.../client/deployment/ClusterSpecification.java | 7 +-
.../flink/configuration/ConfigurationUtils.java | 38 ++++++++
.../apache/flink/yarn/YarnResourceManager.java | 4 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 21 +++--
.../flink/yarn/FlinkYarnSessionCliTest.java | 99 ++++++++++++++++++++
8 files changed, 167 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/docs/dev/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_shell.md b/docs/dev/scala_shell.md
index b8d2b2c..d236430 100644
--- a/docs/dev/scala_shell.md
+++ b/docs/dev/scala_shell.md
@@ -175,7 +175,7 @@ Starts Flink scala shell connecting to a yarn cluster
-n arg | --container arg
Number of YARN container to allocate (= Number of TaskManagers)
-jm arg | --jobManagerMemory arg
- Memory for JobManager container [in MB]
+ Memory for JobManager container with optional unit (default: MB)
-nm <value> | --name <value>
Set a custom name for the application on YARN
-qu <arg> | --queue <arg>
@@ -183,7 +183,7 @@ Starts Flink scala shell connecting to a yarn cluster
-s <arg> | --slots <arg>
Number of slots per TaskManager
-tm <arg> | --taskManagerMemory <arg>
- Memory per TaskManager container [in MB]
+ Memory per TaskManager container with optional unit (default: MB)
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
--configDir <value>
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/docs/ops/cli.md
----------------------------------------------------------------------
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 6974a2d..f96ccf5 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -272,8 +272,8 @@ Action "run" compiles and runs a program.
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
- -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
- MB]
+ -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container
+ with optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynm,--yarnname <arg> Set a custom name for the application
@@ -285,8 +285,8 @@ Action "run" compiles and runs a program.
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
- -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [in
- MB]
+ -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
+ with optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-ynl,--yarnnodeLabel <arg> Specify YARN node label for
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/docs/ops/deployment/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/yarn_setup.md b/docs/ops/deployment/yarn_setup.md
index d2fdad9..9c255d2 100644
--- a/docs/ops/deployment/yarn_setup.md
+++ b/docs/ops/deployment/yarn_setup.md
@@ -38,7 +38,7 @@ Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
-./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
+./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
{% endhighlight %}
Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
@@ -53,7 +53,7 @@ Once the session has been started, you can submit jobs to the cluster using the
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
-./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
+./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
{% endhighlight %}
## Flink YARN Session
@@ -101,12 +101,12 @@ Usage:
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
- -jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
+ -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
- -tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]
+ -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 90de955..72975d8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -19,8 +19,7 @@
package org.apache.flink.client.deployment;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;
/**
@@ -68,8 +67,8 @@ public final class ClusterSpecification {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- int jobManagerMemoryMb = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
- int taskManagerMemoryMb = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
+ int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
+ int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 3d1d830..1b30821 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -32,6 +32,44 @@ public class ConfigurationUtils {
private static final String[] EMPTY = new String[0];
/**
+ * Get job manager's heap memory. This method will check the new key
+ * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
+ * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
+ *
+ * @param configuration the configuration object
+ * @return the memory size of job manager's heap memory.
+ */
+ public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
+ if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
+ return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+ } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
+ return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
+ } else {
+ //use default value
+ return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+ }
+ }
+
+ /**
+ * Get task manager's heap memory. This method will check the new key
+ * {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY} and
+ * the old key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
+ *
+ * @param configuration the configuration object
+ * @return the memory size of task manager's heap memory.
+ */
+ public static MemorySize getTaskManagerHeapMemory(Configuration configuration) {
+ if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) {
+ return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
+ } else if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB.key())) {
+ return MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) + "m");
+ } else {
+ //use default value
+ return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
+ }
+ }
+
+ /**
* Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 96ec57e..0206ffb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -163,7 +163,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.webInterfaceUrl = webInterfaceUrl;
this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
- this.defaultTaskManagerMemoryMB = MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
+ this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 1ad1bcc..c0180a8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -190,8 +191,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
- jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
- tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+ jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container with optional unit (default: MB)");
+ tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container with optional unit (default: MB)");
container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
dynamicproperties = Option.builder(shortPrefix + "D")
@@ -386,10 +387,10 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
// JobManager Memory
- final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+ final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
// Task Managers memory
- final int taskManagerMemoryMB = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
+ final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
@@ -500,11 +501,19 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
if (commandLine.hasOption(jmMemory.getOpt())) {
- effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
+ String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
+ if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
+ jmMemoryVal += "m";
+ }
+ effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jmMemoryVal);
}
if (commandLine.hasOption(tmMemory.getOpt())) {
- effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()));
+ String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt());
+ if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
+ tmMemoryVal += "m";
+ }
+ effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
}
if (commandLine.hasOption(slots.getOpt())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4095a310/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 977da45..aa958e2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -352,6 +352,105 @@ public class FlinkYarnSessionCliTest extends TestLogger {
assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
}
+ /**
+ * Tests the specifying heap memory without unit for job manager and task manager.
+ */
+ @Test
+ public void testHeapMemoryPropertyWithoutUnit() throws Exception {
+ final String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
+ final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+ final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
+ }
+
+ /**
+ * Tests the specifying heap memory with unit (MB) for job manager and task manager.
+ */
+ @Test
+ public void testHeapMemoryPropertyWithUnitMB() throws Exception {
+ final String[] args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" };
+ final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+ final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
+ }
+
+ /**
+ * Tests the specifying heap memory with arbitrary unit for job manager and task manager.
+ */
+ @Test
+ public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
+ final String[] args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
+ final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+ final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
+ }
+
+ /**
+ * Tests the specifying heap memory with old config key for job manager and task manager.
+ */
+ @Test
+ public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
+ configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
+
+ final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
+
+ final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
+ }
+
+ /**
+ * Tests the specifying heap memory with config default value for job manager and task manager.
+ */
+ @Test
+ public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
+ final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
+
+ final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
+ }
+
+
///////////
// Utils //
///////////