You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/03 05:22:20 UTC
[zeppelin] branch master updated: [ZEPPELIN-5428] Unify flink
configuration between different execution modes (#4155)
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new e34cf0f [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155)
e34cf0f is described below
commit e34cf0f6062643cb848c4da03aaa98635b261e77
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jul 3 13:22:07 2021 +0800
[ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155)
* [ZEPPELIN-5428] Unify flink configuration between different execution modes
---
docs/interpreter/flink.md | 18 +++++------
.../src/main/resources/interpreter-setting.json | 18 +++++------
.../zeppelin/flink/FlinkScalaInterpreter.scala | 16 ++++++----
.../src/test/resources/log4j.properties | 2 +-
.../src/test/resources/log4j2.properties | 4 +--
.../launcher/FlinkInterpreterLauncher.java | 36 ++++++++++++++++++++++
6 files changed, 67 insertions(+), 27 deletions(-)
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 40d8d6f..27989b0 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -106,17 +106,17 @@ You can also add and set other flink properties which are not listed in the tabl
<td>Port of running JobManager. Only used for remote mode</td>
</tr>
<tr>
- <td>flink.jm.memory</td>
- <td>1024</td>
- <td>Total number of memory(mb) of JobManager</td>
+ <td>jobmanager.memory.process.size</td>
+ <td>1024m</td>
+ <td>Total number of memory of JobManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
</tr>
<tr>
- <td>flink.tm.memory</td>
- <td>1024</td>
- <td>Total number of memory(mb) of TaskManager</td>
+ <td>taskmanager.memory.process.size</td>
+ <td>1024m</td>
+ <td>Total number of memory of TaskManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
</tr>
<tr>
- <td>flink.tm.slot</td>
+ <td>taskmanager.numberOfTaskSlots</td>
<td>1</td>
<td>Number of slot per TaskManager</td>
</tr>
@@ -126,12 +126,12 @@ You can also add and set other flink properties which are not listed in the tabl
<td>Total number of TaskManagers in local mode</td>
</tr>
<tr>
- <td>flink.yarn.appName</td>
+ <td>yarn.application.name</td>
<td>Zeppelin Flink Session</td>
<td>Yarn app name</td>
</tr>
<tr>
- <td>flink.yarn.queue</td>
+ <td>yarn.application.queue</td>
<td>default</td>
<td>queue name of yarn app</td>
</tr>
diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
index 0a87ddf..e5dd7a8 100644
--- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
+++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
@@ -47,21 +47,21 @@
"description": "Port of running JobManager. Only used for remote mode",
"type": "number"
},
- "flink.jm.memory": {
+ "jobmanager.memory.process.size": {
"envName": null,
"propertyName": null,
- "defaultValue": "1024",
- "description": "Memory for JobManager (mb)",
+ "defaultValue": "1024m",
+ "description": "Memory for JobManager, e.g. 1024m",
"type": "number"
},
- "flink.tm.memory": {
+ "taskmanager.memory.process.size": {
"envName": null,
"propertyName": null,
- "defaultValue": "1024",
- "description": "Memory for TaskManager (mb)",
+ "defaultValue": "1024m",
+ "description": "Memory for TaskManager, e.g. 1024m",
"type": "number"
},
- "flink.tm.slot": {
+ "taskmanager.numberOfTaskSlots": {
"envName": null,
"propertyName": null,
"defaultValue": "1",
@@ -75,14 +75,14 @@
"description": "Number of TaskManager in local mode",
"type": "number"
},
- "flink.yarn.appName": {
+ "yarn.application.name": {
"envName": null,
"propertyName": null,
"defaultValue": "Zeppelin Flink Session",
"description": "Yarn app name",
"type": "string"
},
- "flink.yarn.queue": {
+ "yarn.application.queue": {
"envName": null,
"propertyName": null,
"defaultValue": "default",
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 9a5645c..96061a4 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -203,28 +203,32 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
var config = Config(executionMode = mode)
- val jmMemory = properties.getProperty("flink.jm.memory", "1024")
+ val jmMemory = properties.getProperty("jobmanager.memory.process.size",
+ properties.getProperty("flink.jm.memory", "1024"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(jobManagerMemory = Some(jmMemory))))
- val tmMemory = properties.getProperty("flink.tm.memory", "1024")
+ val tmMemory = properties.getProperty("taskmanager.memory.process.size",
+ properties.getProperty("flink.tm.memory", "1024"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(taskManagerMemory = Some(tmMemory))))
- val appName = properties.getProperty("flink.yarn.appName", "Flink Yarn App Name")
+ val appName = properties.getProperty("yarn.application.name",
+ properties.getProperty("flink.yarn.appName", "Flink Yarn App Name"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(name = Some(appName))))
- val slotNum = Integer.parseInt(properties.getProperty("flink.tm.slot", "1"))
+ val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
+ properties.getProperty("flink.tm.slot", "1")))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(slots = Some(slotNum))))
- this.configuration.setInteger("taskmanager.numberOfTaskSlots", slotNum)
- val queue = (properties.getProperty("flink.yarn.queue", "default"))
+ val queue = properties.getProperty("yarn.application.queue",
+ properties.getProperty("flink.yarn.queue", "default"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(queue = Some(queue))))
diff --git a/flink/flink-scala-parent/src/test/resources/log4j.properties b/flink/flink-scala-parent/src/test/resources/log4j.properties
index ff1b634..69dd677 100644
--- a/flink/flink-scala-parent/src/test/resources/log4j.properties
+++ b/flink/flink-scala-parent/src/test/resources/log4j.properties
@@ -22,7 +22,7 @@ log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
log4j.logger.org.apache.hive=WARN
-log4j.logger.org.apache.flink=INFO
+log4j.logger.org.apache.flink=WARN
log4j.logger.org.apache.zeppelin.flink=INFO
log4j.logger.org.apache.zeppelin.python=WARN
log4j.logger.org.apache.flink.streaming.api.operators.collect=ERROR
diff --git a/flink/flink-scala-parent/src/test/resources/log4j2.properties b/flink/flink-scala-parent/src/test/resources/log4j2.properties
index d003c42..965a410 100644
--- a/flink/flink-scala-parent/src/test/resources/log4j2.properties
+++ b/flink/flink-scala-parent/src/test/resources/log4j2.properties
@@ -21,8 +21,8 @@ rootLogger.level = INFO
#rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
-#logger.flink.name = org.apache.flink
-#logger.flink.level = INFO
+logger.flink.name = org.apache.flink
+logger.flink.level = WARN
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 21379f6..76017b2 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
@@ -54,6 +55,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+ normalizeConfiguration(context);
+
// yarn application mode specific logic
if ("yarn-application".equalsIgnoreCase(
context.getProperties().getProperty("flink.execution.mode"))) {
@@ -66,6 +69,39 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
return envs;
}
+ // do mapping between configuration of different execution modes.
+ private void normalizeConfiguration(InterpreterLaunchContext context) {
+ Properties intpProperties = context.getProperties();
+ setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+ setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+ setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+ setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+ setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+ }
+
+ /**
+ * flink.jm.memory and flink.tm.memory only support int value and the unit is mb. (e.g. 1024)
+ * And you need to specify unit for jobmanager.memory.process.size and
+ * taskmanager.memory.process.size, e.g. 1024 mb.
+ * @param properties
+ * @param oldKey
+ * @param newKey
+ * @param isMemoryProperty
+ */
+ private void setNewProperty(Properties properties,
+ String oldKey,
+ String newKey,
+ boolean isMemoryProperty) {
+ String value = properties.getProperty(oldKey);
+ if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+ if (isMemoryProperty) {
+ properties.put(newKey, value + "mb");
+ } else {
+ properties.put(newKey, value);
+ }
+ }
+ }
+
private String chooseFlinkAppJar(String flinkHome) throws IOException {
File flinkLibFolder = new File(flinkHome, "lib");
List<File> flinkDistFiles =