You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/29 03:47:45 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5415] Fix
flink.webui.yarn.useProxy not working on yarn-application mode
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 7464f6d [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode
7464f6d is described below
commit 7464f6de27abd7973cb91ce0687f326bd9ffbf1f
Author: jiabao.sun <ji...@xtransfer.cn>
AuthorDate: Mon Jun 21 10:39:03 2021 +0800
[ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode
### What is this PR for?
Fix flink.webui.yarn.useProxy not working on yarn-application mode
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* [ZEPPELIN-5415]
### How should this be tested?
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: jiabao.sun <ji...@xtransfer.cn>
Closes #4142 from Jiabao-Sun/fix-use-proxy-of-yarn-application and squashes the following commits:
37b37eca20 [jiabao.sun] Return None if yarn address is null or empty.
3bd2508aae [jiabao.sun] reduce duplicate code
da41ed444b [jiabao.sun] Fix flink.webui.yarn.useProxy not works on yarn-application mode
(cherry picked from commit 34561b5485c6b73dc635073f46be37cd1c547c20)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../zeppelin/flink/FlinkScalaInterpreter.scala | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index e0e97ba..9a5645c 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -298,13 +298,9 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
this.jmWebUrl = clusterClient.getWebInterfaceURL
} else if (mode == ExecutionMode.YARN) {
LOGGER.info("Starting FlinkCluster in yarn mode")
- if (properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean) {
+ if (isYarnUseProxy()) {
this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient)
- // for some cloud vender, the yarn address may be mapped to some other address.
- val yarnAddress = properties.getProperty("flink.webui.yarn.address")
- if (!StringUtils.isBlank(yarnAddress)) {
- this.displayedJMWebUrl = FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress)
- }
+ this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
} else {
this.jmWebUrl = clusterClient.getWebInterfaceURL
}
@@ -314,10 +310,14 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
case None =>
// remote mode
if (mode == ExecutionMode.YARN_APPLICATION) {
- val yarnAppId = System.getenv("_APP_ID");
+ val yarnAppId = System.getenv("_APP_ID")
LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
- this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
- this.displayedJMWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
+ if (isYarnUseProxy()) {
+ this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
+ this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl)
+ } else {
+ this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
+ }
} else {
LOGGER.info("Use FlinkCluster in remote mode")
this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
@@ -843,6 +843,20 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
})
}
+ private def isYarnUseProxy(): Boolean = {
+ properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean
+ }
+
+ private def getJmWebUrlUnderProxy(): Option[String] = {
+ // for some cloud vender, the yarn address may be mapped to some other address.
+ val yarnAddress = properties.getProperty("flink.webui.yarn.address")
+ if (StringUtils.isNotBlank(yarnAddress)) {
+ Some(FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress))
+ } else {
+ None
+ }
+ }
+
def getJobManager = this.jobManager
def getFlinkScalaShellLoader: ClassLoader = {