You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/07 21:47:18 UTC

flink git commit: [FLINK-5916] [yarn] make env.java.opts.jobmanager and env.java.opts.taskmanager working in YARN mode

Repository: flink
Updated Branches:
  refs/heads/master 53fb8f3b5 -> e9a5c8629


[FLINK-5916] [yarn] make env.java.opts.jobmanager and env.java.opts.taskmanager working in YARN mode

minor change and add test case

This closes #3415.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a5c862
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a5c862
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a5c862

Branch: refs/heads/master
Commit: e9a5c8629408d0b5e7fa89a072f16f6788f961a2
Parents: 53fb8f3
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Sat Feb 25 15:35:19 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Mar 7 22:46:50 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  4 ++--
 .../apache/flink/configuration/CoreOptions.java |  8 +++++++
 .../clusterframework/BootstrapTools.java        |  3 +++
 .../clusterframework/BootstrapToolsTest.java    | 25 ++++++++++++++++++--
 .../yarn/AbstractYarnClusterDescriptor.java     |  7 ++++--
 .../flink/yarn/YarnClusterDescriptorTest.java   | 25 ++++++++++++++++++--
 6 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index b21c647..b297ae6 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -44,9 +44,9 @@ The configuration files for the TaskManagers can be different, Flink does not as
 
 - `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively.
 
-- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
+- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
 
-- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
+- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
 
 - `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 70e5f0b..4e30ceb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -30,6 +30,14 @@ public class CoreOptions {
 		.key("env.java.opts")
 		.defaultValue("");
 
+	public static final ConfigOption<String> FLINK_JM_JVM_OPTIONS = ConfigOptions
+		.key("env.java.opts.jobmanager")
+		.defaultValue("");
+
+	public static final ConfigOption<String> FLINK_TM_JVM_OPTIONS = ConfigOptions
+		.key("env.java.opts.taskmanager")
+		.defaultValue("");
+
 	public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
 		.key("parallelism.default")
 		.defaultValue(-1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ebc9af8..e356d2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -360,6 +360,9 @@ public class BootstrapTools {
 							"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
 							"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
 		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
+		if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
+			javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
+		}
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if(hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 1d100da..cf38fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -122,6 +122,7 @@ public class BootstrapToolsTest {
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xms768m -Xmx768m -XX:MaxDirectMemorySize=256m";
 		final String jvmOpts = "-Djvm"; // if set
+		final String tmJvmOpts = "-DtmJvm"; // if set
 		final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
 		final String logback =
 			"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
@@ -229,13 +230,33 @@ public class BootstrapToolsTest {
 				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
 					true, true, true, this.getClass()));
 
+		// logback + log4j, with/out krb5, different JVM opts
+		cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + tmJvmOpts +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, false, this.getClass()));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, true, this.getClass()));
+
 		// now try some configurations with different yarn.container-start-command-template
 
 		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
 			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
 		assertEquals(
 			java + " 1 " + jvmmem +
-				" 2 " + jvmOpts + " " + krb5 + // jvmOpts
+				" 2 " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
 				" 3 " + logfile + " " + logback + " " + log4j +
 				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
 			BootstrapTools
@@ -247,7 +268,7 @@ public class BootstrapToolsTest {
 		assertEquals(
 			java +
 				" " + logfile + " " + logback + " " + log4j +
-				" " + jvmOpts + " " + krb5 + // jvmOpts
+				" " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
 				" " + jvmmem +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index edf57b3..483e279 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -721,8 +721,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		paths.add(remotePathConf);
 		classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
 
-		// write job graph to tmp file and add it to local resource 
-		// TODO: server use user main method to generate job graph 
+		// write job graph to tmp file and add it to local resource
+		// TODO: server use user main method to generate job graph
 		if (jobGraph != null) {
 			try {
 				File fp = File.createTempFile(appId.toString(), null);
@@ -1234,6 +1234,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		// respect custom JVM options in the YAML file
 		String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
+		if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
+			javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
+		}
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if (hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a5c862/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index ad3ebcd..a7204da 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -113,6 +113,7 @@ public class YarnClusterDescriptorTest {
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xmx424m";
 		final String jvmOpts = "-Djvm"; // if set
+		final String jmJvmOpts = "-DjmJvm"; // if set
 		final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
 		final String logfile =
 			"-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -223,13 +224,33 @@ public class YarnClusterDescriptorTest {
 				.setupApplicationMasterContainer(true, true, true)
 				.getCommands().get(0));
 
+		// logback + log4j, with/out krb5, different JVM opts
+		cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + jmJvmOpts +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " "  + args + " "+ redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 +// jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " "  + args + " "+ redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, true)
+				.getCommands().get(0));
+
 		// now try some configurations with different yarn.container-start-command-template
 
 		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
 			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
 		assertEquals(
 			java + " 1 " + jvmmem +
-				" 2 " + jvmOpts + " " + krb5 + // jvmOpts
+				" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 				" 3 " + logfile + " " + logback + " " + log4j +
 				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
 			clusterDescriptor
@@ -241,7 +262,7 @@ public class YarnClusterDescriptorTest {
 		assertEquals(
 			java +
 				" " + logfile + " " + logback + " " + log4j +
-				" " + jvmOpts + " " + krb5 + // jvmOpts
+				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 				" " + jvmmem +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor