You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2021/06/24 05:14:18 UTC

[GitHub] [zeppelin] zjffdu opened a new pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

zjffdu opened a new pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155


   ### What is this PR for?
   
   This PR is to unify the configuration of flink interpreter. Due to historical reason, some properties are only available for yarn mode, but not for yarn-application mode, this PR is to unify them. Here's the properties we handle in this PR.
   * flink.tm.memory
   * flink.jm.memory
   * flink.yarn.appName
   * flink.yarn.queue
   * flink.tm.slot
   
   ### What type of PR is it?
   [Improvement]
   
   ### Todos
   * [ ] - Task
   
   ### What is the Jira issue?
   * https://issues.apache.org/jira/browse/ZEPPELIN-5423
   
   ### How should this be tested?
   * CI pass
   * 
   ### Screenshots (if appropriate)
   
   ### Questions:
   * Does the licenses files need update? No
   * Is there breaking changes for older versions? No
   * Does this needs documentation? No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] cuspymd commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
cuspymd commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r659466622



##########
File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -66,6 +69,30 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }

Review comment:
       > 
   > 
   > The conversion here only happens when there's no new key exists.
   > So the order is newKey > oldKey
   
   I know what you said. But if the user sets the `taskmanager.memory.process.size` value referring to the manual (flink.md line 114~116), there is a high probability that "mb" is not attached to the set value.
   
   In this case, I ask if it is necessary to check the newKey setting value and add "mb".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r661152444



##########
File path: docs/interpreter/flink.md
##########
@@ -106,17 +106,17 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>Port of running JobManager. Only used for remote mode</td>
   </tr>
   <tr>
-    <td>flink.jm.memory</td>
+    <td>jobmanager.memory.process.size</td>
     <td>1024</td>

Review comment:
       Fixed 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] cuspymd commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
cuspymd commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r657808375



##########
File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -66,6 +69,30 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }

Review comment:
       If the value of `newKey` exists already, is it not necessary to append "mb" to it? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] cuspymd commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
cuspymd commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r657808375



##########
File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -66,6 +69,30 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }

Review comment:
       If the value of `newKey` exists, is it not necessary to append "mb" to it? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#issuecomment-872693666


   Will merge if no more comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] cuspymd commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
cuspymd commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r661116737



##########
File path: docs/interpreter/flink.md
##########
@@ -106,17 +106,17 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>Port of running JobManager. Only used for remote mode</td>
   </tr>
   <tr>
-    <td>flink.jm.memory</td>
+    <td>jobmanager.memory.process.size</td>
     <td>1024</td>

Review comment:
       Shouldn't it be fixed here as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r659483224



##########
File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -66,6 +69,30 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }

Review comment:
       Oops, you are right, I have updated the doc. It is official [flink property](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/), user should not add unit in this property. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu commented on a change in pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155#discussion_r659424322



##########
File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -66,6 +69,30 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
     return envs;
   }
 
+  // do mapping between configuration of different execution modes.
+  private void normalizeConfiguration(InterpreterLaunchContext context) {
+    Properties intpProperties = context.getProperties();
+    setNewProperty(intpProperties, "flink.jm.memory", "jobmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.memory", "taskmanager.memory.process.size", true);
+    setNewProperty(intpProperties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
+    setNewProperty(intpProperties, "flink.yarn.appName", "yarn.application.name", false);
+    setNewProperty(intpProperties, "flink.yarn.queue", "yarn.application.queue", false);
+  }
+
+  private void setNewProperty(Properties properties,
+                              String oldKey,
+                              String newKey,
+                              boolean isMemoryProperty) {
+    String value = properties.getProperty(oldKey);
+    if (StringUtils.isNotBlank(value) && !properties.containsKey(newKey)) {
+      if (isMemoryProperty) {
+        properties.put(newKey, value + "mb");
+      } else {
+        properties.put(newKey, value);
+      }
+    }

Review comment:
       The conversion here only happens when there's no new key exists. 
   So the order is newKey > oldKey




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [zeppelin] zjffdu merged pull request #4155: [ZEPPELIN-5428] Unify flink configuration between different execution modes

Posted by GitBox <gi...@apache.org>.
zjffdu merged pull request #4155:
URL: https://github.com/apache/zeppelin/pull/4155


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org