You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/08 03:28:25 UTC

[zeppelin] branch master updated: [ZEPPELIN-4710]. Allow to inject application id into custom spark url

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new ec16666  [ZEPPELIN-4710]. Allow to inject application id into custom spark url
ec16666 is described below

commit ec16666bc4704842df0941cc7aee6ed441562e6f
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Apr 7 17:15:45 2020 +0800

    [ZEPPELIN-4710]. Allow to inject application id into custom spark url
    
    ### What is this PR for?
     This is for injecting appId into custom spark url. Currently we allow user to set `zeppelin.spark.uiWebUrl` for a custom spark ui link. But we didn't inject appId into it. this make it less flexible for some cases. So this PR is to allow inject application id into custom spark url. e.g.
    `url_prefix/{{applicationId}}`
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4710
    
    ### How should this be tested?
    * Unit test added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3711 from zjffdu/ZEPPELIN-4710 and squashes the following commits:
    
    a887199db [Jeff Zhang] remove code duplicate
    68fceecc8 [Jeff Zhang] [ZEPPELIN-4710]. Allow to inject application id into custom spark url
---
 .../apache/zeppelin/spark/SparkInterpreterTest.java |  5 +++--
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala  | 21 ++++++++++++++-------
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index a15b046..f3f9dec 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -72,7 +72,7 @@ public class SparkInterpreterTest {
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
@@ -180,7 +180,8 @@ public class SparkInterpreterTest {
     // spark job url is sent
     ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class);
     verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture());
-    assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl"));
+    assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/"
+            + interpreter.getJavaSparkContext().sc().applicationId()));
 
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 994c7ca..0361c94 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -237,6 +237,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
       case None =>
     }
 
+    initSparkWebUrl()
+
     val hiveSiteExisted: Boolean =
       Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
     val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false)
@@ -306,7 +308,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
       case Some(url) => sparkUrl = url
       case None =>
     }
-    useYarnProxyURLIfNeeded()
+
+    initSparkWebUrl()
 
     bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
     bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
@@ -321,6 +324,15 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     scalaInterpret("print(\"\")")
   }
 
+  private def initSparkWebUrl(): Unit = {
+    val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
+    if (!StringUtils.isBlank(webUiUrl)) {
+      this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId);
+    } else {
+      useYarnProxyURLIfNeeded()
+    }
+  }
+
   protected def createZeppelinContext(): Unit = {
 
     var sparkShims: SparkShims = null
@@ -329,13 +341,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     } else {
       sparkShims = SparkShims.getInstance(sc.version, properties, sc)
     }
-    var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
-    if (StringUtils.isBlank(webUiUrl)) {
-      webUiUrl = sparkUrl;
-    }
-    useYarnProxyURLIfNeeded()
 
-    sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
 
     z = new SparkZeppelinContext(sc, sparkShims,
       interpreterGroup.getInterpreterHookRegistry,