You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/22 02:20:37 UTC
[zeppelin] branch master updated: [ZEPPELIN-5463] Introduce flink
jm url template configuration
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new cc2f8f7 [ZEPPELIN-5463] Introduce flink jm url template configuration
cc2f8f7 is described below
commit cc2f8f7aeddb8d718e0be2205d7f742d29d03bd3
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 19 14:13:39 2021 +0800
[ZEPPELIN-5463] Introduce flink jm url template configuration
### What is this PR for?
Introduce configuration `zeppelin.flink.uiWebUrl` as user specified job manager url, it could be used in 2 cases:
1. remote mode: where flink cluster is started already, so that job manager url is know
2. yarn/yarn-application mode: user can only access knox url or other vendor provided url.
The usage of `zeppelin.flink.uiWebUrl` is consistent with `zeppelin.spark.uiWebUrl`
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5463
### How should this be tested?
* Manually tested
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4180 from zjffdu/ZEPPELIN-5463 and squashes the following commits:
e1389752a3 [Jeff Zhang] [ZEPPELIN-5463] Introduce flink jm url template configuration
---
docs/interpreter/flink.md | 9 +---
.../org/apache/zeppelin/flink/HadoopUtils.java | 4 ++
.../src/main/resources/interpreter-setting.json | 11 +----
.../zeppelin/flink/FlinkScalaInterpreter.scala | 54 +++++++---------------
.../zeppelin/flink/FlinkScalaInterpreterTest.scala | 41 ----------------
5 files changed, 24 insertions(+), 95 deletions(-)
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 343a701..01ea99e 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -136,14 +136,9 @@ You can also add and set other flink properties which are not listed in the tabl
<td>queue name of yarn app</td>
</tr>
<tr>
- <td>flink.webui.yarn.useProxy</td>
- <td>false</td>
- <td>whether use yarn proxy url as flink weburl, e.g. http://resource-manager:8088/proxy/application_1583396598068_0004</td>
- </tr>
- <tr>
- <td>flink.webui.yarn.address</td>
+ <td>zeppelin.flink.uiWebUrl</td>
<td></td>
- <td>Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`</td>
+ <td>User specified Flink JobManager url, it could be used in remote mode where Flink cluster is already started, or could be used as url template, e.g. https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{{applicationId}}/ where {{applicationId}} would be replaced with yarn app id</td>
</tr>
<tr>
<td>zeppelin.flink.run.asLoginUser</td>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
index d8b7e59..a8ef900 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
@@ -58,6 +58,10 @@ public class HadoopUtils {
return getYarnApplicationReport(yarnAppId).getTrackingUrl();
}
+ public static String getYarnAppId(ClusterClient clusterClient) {
+ return ((ApplicationId) clusterClient.getClusterId()).toString();
+ }
+
public static int getFlinkRestPort(String yarnAppId) throws IOException, YarnException {
return getYarnApplicationReport(ConverterUtils.toApplicationId(yarnAppId)).getRpcPort();
}
diff --git a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
index bd91f80..12f41c4 100644
--- a/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
+++ b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
@@ -89,18 +89,11 @@
"description": "Yarn queue name",
"type": "string"
},
- "flink.webui.yarn.useProxy": {
+ "zeppelin.flink.uiWebUrl": {
"envName": null,
"propertyName": null,
"defaultValue": false,
- "description": "Whether use yarn proxy url as flink weburl, e.g. http://localhost:8088/proxy/application_1583396598068_0004",
- "type": "checkbox"
- },
- "flink.webui.yarn.address": {
- "envName": null,
- "propertyName": null,
- "defaultValue": "",
- "description": "Set this value only when your yarn address is mapped to some other address, e.g. some cloud vender will map `http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`",
+ "description": "User specified Flink JobManager url, it could be used in remote mode where Flink cluster is already started, or could be used as url template, e.g. https://knox-server:8443/gateway/cluster-topo/yarn/proxy/{{applicationId}}/ where {{applicationId}} would be replaced with yarn app id",
"type": "string"
},
"zeppelin.flink.run.asLoginUser": {
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 96061a4..7729a5f 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -300,39 +300,30 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
if (mode == ExecutionMode.LOCAL) {
LOGGER.info("Starting FlinkCluster in local mode")
this.jmWebUrl = clusterClient.getWebInterfaceURL
+ this.displayedJMWebUrl = this.jmWebUrl
} else if (mode == ExecutionMode.YARN) {
LOGGER.info("Starting FlinkCluster in yarn mode")
- if (isYarnUseProxy()) {
- this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient)
- this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
- } else {
- this.jmWebUrl = clusterClient.getWebInterfaceURL
- }
+ this.jmWebUrl = clusterClient.getWebInterfaceURL
+ val yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
+ this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else {
throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
}
case None =>
- // remote mode
+ // yarn-application mode
if (mode == ExecutionMode.YARN_APPLICATION) {
+ // get yarnAppId from env `_APP_ID`
val yarnAppId = System.getenv("_APP_ID")
LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
- if (isYarnUseProxy()) {
- this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
- this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
- } else {
- this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
- }
+ this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
+ this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else {
LOGGER.info("Use FlinkCluster in remote mode")
this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
+ this.displayedJMWebUrl = getDisplayedJMWebUrl("")
}
}
- if (this.displayedJMWebUrl == null) {
- // use jmWebUrl as displayedJMWebUrl if it is not set
- this.displayedJMWebUrl = this.jmWebUrl
- }
-
LOGGER.info(s"\nConnecting to Flink cluster: " + this.jmWebUrl)
if (InterpreterContext.get() != null) {
InterpreterContext.get().getIntpEventClient.sendWebUrlInfo(this.jmWebUrl)
@@ -847,17 +838,14 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
})
}
- private def isYarnUseProxy(): Boolean = {
- properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean
- }
-
- private def getJmWebUrlUnderProxy(): Option[String] = {
- // for some cloud vender, the yarn address may be mapped to some other address.
- val yarnAddress = properties.getProperty("flink.webui.yarn.address")
- if (StringUtils.isNotBlank(yarnAddress)) {
- Some(FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress))
+ private def getDisplayedJMWebUrl(yarnAppId: String): String = {
+ // `zeppelin.flink.uiWebUrl` is flink jm url template, {{applicationId}} will be replaced
+ // with real yarn app id.
+ val flinkUIWebUrl = properties.getProperty("zeppelin.flink.uiWebUrl")
+ if (StringUtils.isNotBlank(flinkUIWebUrl)) {
+ flinkUIWebUrl.replace("{{applicationId}}", yarnAppId)
} else {
- None
+ this.jmWebUrl
}
}
@@ -936,13 +924,3 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration]
}
}
-
-object FlinkScalaInterpreter {
- def replaceYarnAddress(webURL: String, yarnAddress: String): String = {
- val pattern = "(https?://.*:\\d+)(.*)".r
- val pattern(prefix, remaining) = webURL
- yarnAddress + remaining
- }
-}
-
-
diff --git a/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala b/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
deleted file mode 100644
index 7e326d8..0000000
--- a/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-
-import java.util.Properties
-
-import org.junit.Assert.assertEquals
-import org.scalatest.FunSuite
-
-class FlinkScalaInterpreterTest extends FunSuite {
-
- test("testReplaceYarnAddress") {
- var targetURL = FlinkScalaInterpreter.replaceYarnAddress("http://localhost:8081",
- "http://my-server:9090/gateway")
- assertEquals("http://my-server:9090/gateway", targetURL)
-
- targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/",
- "https://my-server:9090/gateway")
- assertEquals("https://my-server:9090/gateway/", targetURL)
-
- targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/proxy/app_1",
- "https://my-server:9090/gateway")
- assertEquals("https://my-server:9090/gateway/proxy/app_1", targetURL)
- }
-}