You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/03 05:22:20 UTC

[zeppelin] branch master updated: [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new e34cf0f  [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155)
e34cf0f is described below

commit e34cf0f6062643cb848c4da03aaa98635b261e77
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jul 3 13:22:07 2021 +0800

    [ZEPPELIN-5428] Unify flink configuration between different execution modes (#4155)
    
    * [ZEPPELIN-5428] Unify flink configuration between different execution modes
---
 docs/interpreter/flink.md                          | 18 +++++------
 .../src/main/resources/interpreter-setting.json    | 18 +++++------
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 16 ++++++----
 .../src/test/resources/log4j.properties            |  2 +-
 .../src/test/resources/log4j2.properties           |  4 +--
 .../launcher/FlinkInterpreterLauncher.java         | 36 ++++++++++++++++++++++
 6 files changed, 67 insertions(+), 27 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 40d8d6f..27989b0 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -106,17 +106,17 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>Port of running JobManager. Only used for remote mode</td>
   </tr>
   <tr>
-    <td>flink.jm.memory</td>
-    <td>1024</td>
-    <td>Total number of memory(mb) of JobManager</td>
+    <td>jobmanager.memory.process.size</td>
+    <td>1024m</td>
+    <td>Total number of memory of JobManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
   </tr>
   <tr>
-    <td>flink.tm.memory</td>
-    <td>1024</td>
-    <td>Total number of memory(mb) of TaskManager</td>
+    <td>taskmanager.memory.process.size</td>
+    <td>1024m</td>
+    <td>Total number of memory of TaskManager, e.g. 1024m. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/)</td>
   </tr>
   <tr>
-    <td>flink.tm.slot</td>
+    <td>taskmanager.numberOfTaskSlots</td>
     <td>1</td>
     <td>Number of slot per TaskManager</td>
   </tr>
@@ -126,12 +126,12 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>Total number of TaskManagers in local mode</td>
   </tr>
   <tr>
-    <td>flink.yarn.appName</td>
+    <td>yarn.application.name</td>
     <td>Zeppelin Flink Session</td>
     <td>Yarn app name</td>
   </tr>
   <tr>
-    <td>flink.yarn.queue</td>
+    <td>yarn.application.queue</td>
     <td>default</td>
     <td>queue name of yarn app</td>
   </tr>
diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
index 0a87ddf..e5dd7a8 100644
--- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
+++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
@@ -47,21 +47,21 @@
         "description": "Port of running JobManager. Only used for remote mode",
         "type": "number"
       },
-      "flink.jm.memory": {
+      "jobmanager.memory.process.size": {
         "envName": null,
         "propertyName": null,
-        "defaultValue": "1024",
-        "description": "Memory for JobManager (mb)",
+        "defaultValue": "1024m",
+        "description": "Memory for JobManager, e.g. 1024m",
         "type": "number"
       },
-      "flink.tm.memory": {
+      "taskmanager.memory.process.size": {
         "envName": null,
         "propertyName": null,
-        "defaultValue": "1024",
-        "description": "Memory for TaskManager (mb)",
+        "defaultValue": "1024m",
+        "description": "Memory for TaskManager, e.g. 1024m",
         "type": "number"
       },
-      "flink.tm.slot": {
+      "taskmanager.numberOfTaskSlots": {
         "envName": null,
         "propertyName": null,
         "defaultValue": "1",
@@ -75,14 +75,14 @@
         "description": "Number of TaskManager in local mode",
         "type": "number"
       },
-      "flink.yarn.appName": {
+      "yarn.application.name": {
         "envName": null,
         "propertyName": null,
         "defaultValue": "Zeppelin Flink Session",
         "description": "Yarn app name",
         "type": "string"
       },
-      "flink.yarn.queue": {
+      "yarn.application.queue": {
         "envName": null,
         "propertyName": null,
         "defaultValue": "default",
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 9a5645c..96061a4 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -203,28 +203,32 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
 
     this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
     var config = Config(executionMode = mode)
-    val jmMemory = properties.getProperty("flink.jm.memory", "1024")
+    val jmMemory = properties.getProperty("jobmanager.memory.process.size",
+      properties.getProperty("flink.jm.memory", "1024"))
     config = config.copy(yarnConfig =
       Some(ensureYarnConfig(config)
         .copy(jobManagerMemory = Some(jmMemory))))
 
-    val tmMemory = properties.getProperty("flink.tm.memory", "1024")
+    val tmMemory = properties.getProperty("taskmanager.memory.process.size",
+      properties.getProperty("flink.tm.memory", "1024"))
     config = config.copy(yarnConfig =
       Some(ensureYarnConfig(config)
         .copy(taskManagerMemory = Some(tmMemory))))
 
-    val appName = properties.getProperty("flink.yarn.appName", "Flink Yarn App Name")
+    val appName = properties.getProperty("yarn.application.name",
+      properties.getProperty("flink.yarn.appName", "Flink Yarn App Name"))
     config = config.copy(yarnConfig =
       Some(ensureYarnConfig(config)
         .copy(name = Some(appName))))
 
-    val slotNum = Integer.parseInt(properties.getProperty("flink.tm.slot", "1"))
+    val slotNum =  Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
+     properties.getProperty("flink.tm.slot", "1")))
     config = config.copy(yarnConfig =
       Some(ensureYarnConfig(config)
         .copy(slots = Some(slotNum))))
-    this.configuration.setInteger("taskmanager.numberOfTaskSlots", slotNum)
 
-    val queue = (properties.getProperty("flink.yarn.queue", "default"))
+    val queue = properties.getProperty("yarn.application.queue",
+      properties.getProperty("flink.yarn.queue", "default"))
     config = config.copy(yarnConfig =
       Some(ensureYarnConfig(config)
         .copy(queue = Some(queue))))
diff --git a/flink/flink-scala-parent/src/test/resources/log4j.properties b/flink/flink-scala-parent/src/test/resources/log4j.properties
index ff1b634..69dd677 100644
--- a/flink/flink-scala-parent/src/test/resources/log4j.properties
+++ b/flink/flink-scala-parent/src/test/resources/log4j.properties
@@ -22,7 +22,7 @@ log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
 log4j.logger.org.apache.hive=WARN
-log4j.logger.org.apache.flink=INFO
+log4j.logger.org.apache.flink=WARN
 log4j.logger.org.apache.zeppelin.flink=INFO
 log4j.logger.org.apache.zeppelin.python=WARN
 log4j.logger.org.apache.flink.streaming.api.operators.collect=ERROR
diff --git a/flink/flink-scala-parent/src/test/resources/log4j2.properties b/flink/flink-scala-parent/src/test/resources/log4j2.properties
index d003c42..965a410 100644
--- a/flink/flink-scala-parent/src/test/resources/log4j2.properties
+++ b/flink/flink-scala-parent/src/test/resources/log4j2.properties
@@ -21,8 +21,8 @@ rootLogger.level = INFO
 #rootLogger.appenderRef.file.ref = MainAppender
 
 # Uncomment this if you want to _only_ change Flink's logging
-#logger.flink.name = org.apache.flink
-#logger.flink.level = INFO
+logger.flink.name = org.apache.flink
+logger.flink.level = WARN
 
 # The following lines keep the log level of common libraries/connectors on
 # log level INFO. The root logger does not override this. You have to manually
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 21379f6..76017b2 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
@@ -54,6 +55,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
     envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
 
+    normalizeConfiguration(context);
+
     // yarn application mode specific logic
     if ("yarn-application".equalsIgnoreCase(
             context.getProperties().getProperty("flink.execution.mode"))) {
@@ -66,6 +69,39 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  /**
+   * flink.jm.memory and flink.tm.memory only support int value and the unit is mb. (e.g. 1024)
+   * And you need to specify unit for jobmanager.memory.process.size and
+   * taskmanager.memory.process.size, e.g. 1024 mb.
+   * @param properties
+   * @param oldKey
+   * @param newKey
+   * @param isMemoryProperty
+   */
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }
+  }
+
   private String chooseFlinkAppJar(String flinkHome) throws IOException {
     File flinkLibFolder = new File(flinkHome, "lib");
     List<File> flinkDistFiles =