You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/08 03:29:31 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4710]. Allow to inject application id into custom spark url

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 4f1da64  [ZEPPELIN-4710]. Allow to inject application id into custom spark url
4f1da64 is described below

commit 4f1da64d4aefaffee561a0db7f6eed912dcf4d9f
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Apr 7 17:15:45 2020 +0800

    [ZEPPELIN-4710]. Allow to inject application id into custom spark url
    
    ### What is this PR for?
     This is for injecting appId into custom spark url. Currently we allow user to set `zeppelin.spark.uiWebUrl` for a custom spark ui link. But we didn't inject appId into it. this make it less flexible for some cases. So this PR is to allow inject application id into custom spark url. e.g.
    `url_prefix/{{applicationId}}`
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4710
    
    ### How should this be tested?
    * Unit test added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3711 from zjffdu/ZEPPELIN-4710 and squashes the following commits:
    
    a887199db [Jeff Zhang] remove code duplicate
    68fceecc8 [Jeff Zhang] [ZEPPELIN-4710]. Allow to inject application id into custom spark url
    
    (cherry picked from commit ec16666bc4704842df0941cc7aee6ed441562e6f)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/spark/SparkInterpreterTest.java |  5 +++--
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala  | 21 ++++++++++++++-------
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index a15b046..f3f9dec 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -72,7 +72,7 @@ public class SparkInterpreterTest {
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
@@ -180,7 +180,8 @@ public class SparkInterpreterTest {
     // spark job url is sent
     ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class);
     verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture());
-    assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl"));
+    assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/"
+            + interpreter.getJavaSparkContext().sc().applicationId()));
 
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 994c7ca..0361c94 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -237,6 +237,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
       case None =>
     }
 
+    initSparkWebUrl()
+
     val hiveSiteExisted: Boolean =
       Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
     val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false)
@@ -306,7 +308,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
       case Some(url) => sparkUrl = url
       case None =>
     }
-    useYarnProxyURLIfNeeded()
+
+    initSparkWebUrl()
 
     bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
     bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
@@ -321,6 +324,15 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     scalaInterpret("print(\"\")")
   }
 
+  private def initSparkWebUrl(): Unit = {
+    val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
+    if (!StringUtils.isBlank(webUiUrl)) {
+      this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId);
+    } else {
+      useYarnProxyURLIfNeeded()
+    }
+  }
+
   protected def createZeppelinContext(): Unit = {
 
     var sparkShims: SparkShims = null
@@ -329,13 +341,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     } else {
       sparkShims = SparkShims.getInstance(sc.version, properties, sc)
     }
-    var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
-    if (StringUtils.isBlank(webUiUrl)) {
-      webUiUrl = sparkUrl;
-    }
-    useYarnProxyURLIfNeeded()
 
-    sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
 
     z = new SparkZeppelinContext(sc, sparkShims,
       interpreterGroup.getInterpreterHookRegistry,