You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2019/09/23 22:07:05 UTC

[zeppelin] branch master updated: [ZEPPELIN-4226] Fix "View in Spark web UI" in kubernetes mode

This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 60e12cd  [ZEPPELIN-4226] Fix "View in Spark web UI" in kubernetes mode
60e12cd is described below

commit 60e12cdf86778fb80ad36bb721a6a373206fa12d
Author: Lee moon soo <mo...@apache.org>
AuthorDate: Thu Sep 19 12:10:17 2019 -0700

    [ZEPPELIN-4226] Fix "View in Spark web UI" in kubernetes mode
    
    ### What is this PR for?
    When Zeppelin is running in Kubernetes, "View in Spark web UI" gives internal address, instead of address defined in SERVICE_DOMAIN.
    
    I think this problem is side effect of https://github.com/apache/zeppelin/pull/3375 and this PR includes fix and updated unittest.
    
    ### What type of PR is it?
    Bug Fix
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4226
    
    ### How should this be tested?
    Run Zeppelin on kubernetes, and run spark job, click "View in Spark web UI" button.
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Lee moon soo <mo...@apache.org>
    
    Closes #3451 from Leemoonsoo/ZEPPELIN-4226 and squashes the following commits:
    
    7e34542f0 [Lee moon soo] use StringUtils.isBlank
    a33c3b223 [Lee moon soo] pickup SparkUI address from zeppelin.spark.uiWebUrl
---
 .../main/java/org/apache/zeppelin/spark/SparkInterpreter.java | 11 -----------
 .../java/org/apache/zeppelin/spark/SparkInterpreterTest.java  |  7 ++++---
 .../org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala |  7 ++++++-
 3 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 33769be..7bacce8 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -60,8 +60,6 @@ public class SparkInterpreter extends AbstractInterpreter {
 
   private SparkVersion sparkVersion;
   private boolean enableSupportedVersionCheck;
-  private String sparkUrl;
-
 
   public SparkInterpreter(Properties properties) {
     super(properties);
@@ -109,11 +107,6 @@ public class SparkInterpreter extends AbstractInterpreter {
       }
       sqlContext = this.innerInterpreter.getSqlContext();
       sparkSession = this.innerInterpreter.getSparkSession();
-      sparkUrl = this.innerInterpreter.getSparkUrl();
-      String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
-      if (!StringUtils.isBlank(sparkUrlProp)) {
-        sparkUrl = sparkUrlProp;
-      }
 
       SESSION_NUM.incrementAndGet();
     } catch (Exception e) {
@@ -260,10 +253,6 @@ public class SparkInterpreter extends AbstractInterpreter {
     return depFiles;
   }
 
-  public String getSparkUIUrl() {
-    return sparkUrl;
-  }
-
   public boolean isUnsupportedSparkVersion() {
     return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
   }
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index bceda3a..9a3d471 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -48,6 +48,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class SparkInterpreterTest {
@@ -89,8 +90,6 @@ public class SparkInterpreterTest {
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     interpreter.open();
 
-    assertEquals("fake_spark_weburl", interpreter.getSparkUIUrl());
-
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("a: String = hello world\n", output);
@@ -181,7 +180,9 @@ public class SparkInterpreterTest {
     assertEquals("pid_2", captorEvent.getValue().get("paraId"));
 
     // spark job url is sent
-    verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
+    ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class);
+    verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture());
+    assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl"));
 
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 421d85a..ced1c1f 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -23,6 +23,7 @@ import java.net.URLClassLoader
 import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
@@ -303,7 +304,11 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected def createZeppelinContext(): Unit = {
     val sparkShims = SparkShims.getInstance(sc.version, properties)
-    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
+    if (StringUtils.isBlank(webUiUrl)) {
+      webUiUrl = sparkUrl;
+    }
+    sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get)
 
     z = new SparkZeppelinContext(sc, sparkShims,
       interpreterGroup.getInterpreterHookRegistry,