You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/06 02:09:25 UTC

[zeppelin] branch master updated: [ZEPPELIN-5367] Run flink job as the login user in yarn mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e920f  [ZEPPELIN-5367] Run flink job as the login user in yarn mode
16e920f is described below

commit 16e920fadcc74949b1c759c1da5458547d301c00
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Jul 1 14:58:46 2021 +0800

    [ZEPPELIN-5367] Run flink job as the login user in yarn mode
    
    ### What is this PR for?
    
    Introduce new flink configuration `zeppelin.flink.run.asLoginUser` to control whether run the flink job as the login user in yarn mode.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5367
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/124083533-8ceddd00-da80-11eb-849e-96e93ec9a12f.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4158 from zjffdu/ZEPPELIN-5367 and squashes the following commits:
    
    86e29dc3f6 [Jeff Zhang] [ZEPPELIN-5367] Run flink job as the login user in yarn mode
---
 docs/interpreter/flink.md                                |  5 +++++
 .../src/main/resources/interpreter-setting.json          |  7 +++++++
 .../zeppelin/integration/FlinkIntegrationTest.java       |  4 ++++
 .../interpreter/launcher/FlinkInterpreterLauncher.java   | 16 ++++++++++++++--
 4 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 27989b0..343a701 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -146,6 +146,11 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`</td>
   </tr>
   <tr>
+    <td>zeppelin.flink.run.asLoginUser</td>
+    <td>true</td>
+    <td>Whether run flink job as the zeppelin login user, it is only applied when running flink job in hadoop yarn cluster and shiro is enabled</td>
+  </tr> 
+  <tr>
     <td>flink.udf.jars</td>
     <td></td>
     <td>Flink udf jars (comma separated), zeppelin will register udf in this jar automatically for user. These udf jars could be either local files or hdfs files if you have hadoop installed. The udf name is the class name.</td>
diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
index e5dd7a8..bd91f80 100644
--- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
+++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
@@ -103,6 +103,13 @@
         "description": "Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`",
         "type": "string"
       },
+      "zeppelin.flink.run.asLoginUser": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": true,
+        "description": "Whether run flink job as the zeppelin login user, it is only applied when running flink job in hadoop yarn cluster and shiro is enabled",
+        "type": "checkbox"
+      },
       "flink.udf.jars": {
         "envName": null,
         "propertyName": null,
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 17a4c8c..86daefc 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -154,6 +154,8 @@ public abstract class FlinkIntegrationTest {
     flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
     flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
     flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN");
+    flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
+
     testInterpreterBasics();
 
     // 1 yarn application launched
@@ -176,6 +178,8 @@ public abstract class FlinkIntegrationTest {
     flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
     flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
     flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn-application");
+    flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
+
     testInterpreterBasics();
 
     // 1 yarn application launched
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 76017b2..995cdb6 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -57,15 +57,27 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
 
     normalizeConfiguration(context);
 
+    String flinkExecutionMode = context.getProperties().getProperty("flink.execution.mode");
     // yarn application mode specific logic
-    if ("yarn-application".equalsIgnoreCase(
-            context.getProperties().getProperty("flink.execution.mode"))) {
+    if ("yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
       updateEnvsForYarnApplicationMode(envs, context);
     }
 
     String flinkAppJar = chooseFlinkAppJar(flinkHome);
     LOGGER.info("Choose FLINK_APP_JAR: {}", flinkAppJar);
     envs.put("FLINK_APP_JAR", flinkAppJar);
+
+    if ("yarn".equalsIgnoreCase(flinkExecutionMode) ||
+            "yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
+      boolean runAsLoginUser = Boolean.parseBoolean(context
+              .getProperties()
+              .getProperty("zeppelin.flink.run.asLoginUser", "true"));
+      String userName = context.getUserName();
+      if (runAsLoginUser && !"anonymous".equals(userName)) {
+        envs.put("HADOOP_USER_NAME", userName);
+      }
+    }
+
     return envs;
   }