You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/29 03:47:27 UTC

[zeppelin] branch master updated: [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 34561b5  [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode
34561b5 is described below

commit 34561b5485c6b73dc635073f46be37cd1c547c20
Author: jiabao.sun <ji...@xtransfer.cn>
AuthorDate: Mon Jun 21 10:39:03 2021 +0800

    [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode
    
    ### What is this PR for?
    Fix flink.webui.yarn.useProxy not working on yarn-application mode
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * [ZEPPELIN-5415]
    
    ### How should this be tested?
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: jiabao.sun <ji...@xtransfer.cn>
    
    Closes #4142 from Jiabao-Sun/fix-use-proxy-of-yarn-application and squashes the following commits:
    
    37b37eca20 [jiabao.sun] Return None if yarn address is null or empty.
    3bd2508aae [jiabao.sun] reduce duplicate code
    da41ed444b [jiabao.sun] Fix flink.webui.yarn.useProxy not works on yarn-application mode
---
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index e0e97ba..9a5645c 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -298,13 +298,9 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
             this.jmWebUrl = clusterClient.getWebInterfaceURL
           } else if (mode == ExecutionMode.YARN) {
             LOGGER.info("Starting FlinkCluster in yarn mode")
-            if (properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean) {
+            if (isYarnUseProxy()) {
               this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient)
-              // for some cloud vender, the yarn address may be mapped to some other address.
-              val yarnAddress = properties.getProperty("flink.webui.yarn.address")
-              if (!StringUtils.isBlank(yarnAddress)) {
-                this.displayedJMWebUrl = FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress)
-              }
+              this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
             } else {
               this.jmWebUrl = clusterClient.getWebInterfaceURL
             }
@@ -314,10 +310,14 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
         case None =>
           // remote mode
           if (mode == ExecutionMode.YARN_APPLICATION) {
-            val yarnAppId = System.getenv("_APP_ID");
+            val yarnAppId = System.getenv("_APP_ID")
             LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
-            this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
-            this.displayedJMWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
+            if (isYarnUseProxy()) {
+              this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
+              this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
+            } else {
+              this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
+            }
           } else {
             LOGGER.info("Use FlinkCluster in remote mode")
             this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
@@ -843,6 +843,20 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
     })
   }
 
+  private def isYarnUseProxy(): Boolean = {
+    properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean
+  }
+
+  private def getJmWebUrlUnderProxy(): Option[String] = {
+    // for some cloud vender, the yarn address may be mapped to some other address.
+    val yarnAddress = properties.getProperty("flink.webui.yarn.address")
+    if (StringUtils.isNotBlank(yarnAddress)) {
+      Some(FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress))
+    } else {
+      None
+    }
+  }
+
   def getJobManager = this.jobManager
 
   def getFlinkScalaShellLoader: ClassLoader = {