You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/22 02:21:03 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5463] Introduce flink jm url template configuration

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 500909a  [ZEPPELIN-5463] Introduce flink jm url template configuration
500909a is described below

commit 500909a6ada2f75084ce9360cb3b5c6222458b79
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 19 14:13:39 2021 +0800

    [ZEPPELIN-5463] Introduce flink jm url template configuration
    
    ### What is this PR for?
    
    Introduce configuration `zeppelin.flink.uiWebUrl` as user specified job manager url, it could be used in 2 cases:
    1. remote mode: where flink cluster is started already, so that job manager url is know
    2. yarn/yarn-application mode: user can only access knox url or other vendor provided url.
    
    The usage of `zeppelin.flink.uiWebUrl` is consistent with `zeppelin.spark.uiWebUrl`
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5463
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4180 from zjffdu/ZEPPELIN-5463 and squashes the following commits:
    
    e1389752a3 [Jeff Zhang] [ZEPPELIN-5463] Introduce flink jm url template configuration
    
    (cherry picked from commit cc2f8f7aeddb8d718e0be2205d7f742d29d03bd3)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 docs/interpreter/flink.md                          |  9 +---
 .../org/apache/zeppelin/flink/HadoopUtils.java     |  4 ++
 .../src/main/resources/interpreter-setting.json    | 11 +----
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 54 +++++++---------------
 .../zeppelin/flink/FlinkScalaInterpreterTest.scala | 41 ----------------
 5 files changed, 24 insertions(+), 95 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 343a701..01ea99e 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -136,14 +136,9 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>queue name of yarn app</td>
   </tr>
   <tr>
-    <td>flink.webui.yarn.useProxy</td>
-    <td>false</td>
-    <td>whether use yarn proxy url as flink weburl, e.g. http://resource-manager:8088/proxy/application_1583396598068_0004</td>
-  </tr>
-  <tr>
-    <td>flink.webui.yarn.address</td>
+    <td>zeppelin.flink.uiWebUrl</td>
     <td></td>
-    <td>Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`</td>
+    <td>User specified Flink JobManager url, it could be used in remote mode where Flink cluster is already started, or could be used as url template, e.g. https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{{applicationId}}/ where {{applicationId}} would be replaced with yarn app id</td>
   </tr>
   <tr>
     <td>zeppelin.flink.run.asLoginUser</td>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
index d8b7e59..a8ef900 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
@@ -58,6 +58,10 @@ public class HadoopUtils {
     return getYarnApplicationReport(yarnAppId).getTrackingUrl();
   }
 
+  public static String getYarnAppId(ClusterClient clusterClient) {
+    return ((ApplicationId) clusterClient.getClusterId()).toString();
+  }
+
   public static int getFlinkRestPort(String yarnAppId) throws IOException, YarnException {
     return getYarnApplicationReport(ConverterUtils.toApplicationId(yarnAppId)).getRpcPort();
   }
diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
index bd91f80..12f41c4 100644
--- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
+++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
@@ -89,18 +89,11 @@
         "description": "Yarn queue name",
         "type": "string"
       },
-      "flink.webui.yarn.useProxy": {
+      "zeppelin.flink.uiWebUrl": {
         "envName": null,
         "propertyName": null,
         "defaultValue": false,
-        "description": "Whether use yarn proxy url as flink weburl, e.g. http://localhost:8088/proxy/application_1583396598068_0004",
-        "type": "checkbox"
-      },
-      "flink.webui.yarn.address": {
-        "envName": null,
-        "propertyName": null,
-        "defaultValue": "",
-        "description": "Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`",
+        "description": "User specified Flink JobManager url, it could be used in remote mode where Flink cluster is already started, or could be used as url template, e.g. https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{{applicationId}}/ where {{applicationId}} would be replaced with yarn app id",
         "type": "string"
       },
       "zeppelin.flink.run.asLoginUser": {
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 96061a4..7729a5f 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -300,39 +300,30 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
           if (mode == ExecutionMode.LOCAL) {
             LOGGER.info("Starting FlinkCluster in local mode")
             this.jmWebUrl = clusterClient.getWebInterfaceURL
+            this.displayedJMWebUrl = this.jmWebUrl
           } else if (mode == ExecutionMode.YARN) {
             LOGGER.info("Starting FlinkCluster in yarn mode")
-            if (isYarnUseProxy()) {
-              this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient)
-              this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
-            } else {
-              this.jmWebUrl = clusterClient.getWebInterfaceURL
-            }
+            this.jmWebUrl = clusterClient.getWebInterfaceURL
+            val yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
+            this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
           } else {
             throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
           }
         case None =>
-          // remote mode
+          // yarn-application mode
           if (mode == ExecutionMode.YARN_APPLICATION) {
+            // get yarnAppId from env `_APP_ID`
             val yarnAppId = System.getenv("_APP_ID")
             LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
-            if (isYarnUseProxy()) {
-              this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
-              this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
-            } else {
-              this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
-            }
+            this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
+            this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
           } else {
             LOGGER.info("Use FlinkCluster in remote mode")
             this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
+            this.displayedJMWebUrl = getDisplayedJMWebUrl("")
           }
       }
 
-      if (this.displayedJMWebUrl == null) {
-        // use jmWebUrl as displayedJMWebUrl if it is not set
-        this.displayedJMWebUrl = this.jmWebUrl
-      }
-
       LOGGER.info(s"\nConnecting to Flink cluster: " + this.jmWebUrl)
       if (InterpreterContext.get() != null) {
         InterpreterContext.get().getIntpEventClient.sendWebUrlInfo(this.jmWebUrl)
@@ -847,17 +838,14 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
     })
   }
 
-  private def isYarnUseProxy(): Boolean = {
-    properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean
-  }
-
-  private def getJmWebUrlUnderProxy(): Option[String] = {
-    // for some cloud vender, the yarn address may be mapped to some other address.
-    val yarnAddress = properties.getProperty("flink.webui.yarn.address")
-    if (StringUtils.isNotBlank(yarnAddress)) {
-      Some(FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress))
+  private def getDisplayedJMWebUrl(yarnAppId: String): String = {
+    // `zeppelin.flink.uiWebUrl` is flink jm url template, {{applicationId}} will be replaced
+    // with real yarn app id.
+    val flinkUIWebUrl = properties.getProperty("zeppelin.flink.uiWebUrl")
+    if (StringUtils.isNotBlank(flinkUIWebUrl)) {
+      flinkUIWebUrl.replace("{{applicationId}}", yarnAppId)
     } else {
-      None
+      this.jmWebUrl
     }
   }
 
@@ -936,13 +924,3 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
     getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration]
   }
 }
-
-object FlinkScalaInterpreter {
-  def replaceYarnAddress(webURL: String, yarnAddress: String): String = {
-    val pattern = "(https?://.*:\\d+)(.*)".r
-    val pattern(prefix, remaining) = webURL
-    yarnAddress + remaining
-  }
-}
-
-
diff --git a/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala b/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
deleted file mode 100644
index 7e326d8..0000000
--- a/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-
-import java.util.Properties
-
-import org.junit.Assert.assertEquals
-import org.scalatest.FunSuite
-
-class FlinkScalaInterpreterTest extends FunSuite {
-
-  test("testReplaceYarnAddress") {
-    var targetURL = FlinkScalaInterpreter.replaceYarnAddress("http://localhost:8081",
-      "http://my-server:9090/gateway")
-    assertEquals("http://my-server:9090/gateway", targetURL)
-
-    targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/",
-      "https://my-server:9090/gateway")
-    assertEquals("https://my-server:9090/gateway/", targetURL)
-
-    targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/proxy/app_1",
-      "https://my-server:9090/gateway")
-    assertEquals("https://my-server:9090/gateway/proxy/app_1", targetURL)
-  }
-}