You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/07/05 09:29:06 UTC

[zeppelin] branch master updated: [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 786c4af  [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
786c4af is described below

commit 786c4afbf8076c19683b628b3f29751817bdf4dc
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 1 14:43:51 2019 +0800

    [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
    
    ### What is this PR for?
    
    This PR fix the issue that spark job url is displayed in the wrong paragraph. Unit test is added in SparkInterpreterTest.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4214
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    Before
    
    ![image](https://user-images.githubusercontent.com/164491/60497812-221a3780-9ce8-11e9-9771-28b512002308.png)
    
    After
    ![image](https://user-images.githubusercontent.com/164491/60521296-94574000-9d19-11e9-87a5-6b4cc80dfcc2.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3393 from zjffdu/ZEPPELIN-4214 and squashes the following commits:
    
    28ed16cfe [Jeff Zhang] [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
---
 .../main/java/org/apache/zeppelin/spark/Utils.java | 14 +--------
 .../zeppelin/spark/SparkInterpreterTest.java       | 25 ++++++++++++----
 .../org/apache/zeppelin/spark/SparkShimsTest.java  |  9 ++++--
 .../org/apache/zeppelin/spark/SparkShims.java      | 35 +++++++++++++---------
 .../org/apache/zeppelin/spark/Spark1Shims.java     |  2 +-
 .../org/apache/zeppelin/spark/Spark2Shims.java     |  2 +-
 .../zeppelin/interpreter/InterpreterContext.java   |  4 +++
 7 files changed, 53 insertions(+), 38 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 723a983..381d023 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -119,25 +119,13 @@ class Utils {
     if (context.getAuthenticationInfo() != null) {
       uName = getUserName(context.getAuthenticationInfo());
     }
-    return "zeppelin-" + uName + "-" + context.getNoteId() + "-" + context.getParagraphId();
+    return "zeppelin|" + uName + "|" + context.getNoteId() + "|" + context.getParagraphId();
   }
 
   public static String buildJobDesc(InterpreterContext context) {
     return "Started by: " + getUserName(context.getAuthenticationInfo());
   }
 
-  public static String getNoteId(String jobgroupId) {
-    int indexOf = jobgroupId.indexOf("-");
-    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
-    return jobgroupId.substring(indexOf + 1, secondIndex);
-  }
-
-  public static String getParagraphId(String jobgroupId) {
-    int indexOf = jobgroupId.indexOf("-");
-    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
-    return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
-  }
-
   public static String getUserName(AuthenticationInfo info) {
     String uName = "";
     if (info != null) {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 34b24c7..bceda3a 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -17,13 +17,11 @@
 
 package org.apache.zeppelin.spark;
 
-import com.google.common.io.Files;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.ui.CheckBox;
 import org.apache.zeppelin.display.ui.Password;
 import org.apache.zeppelin.display.ui.Select;
 import org.apache.zeppelin.display.ui.TextBox;
-import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -35,11 +33,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -49,6 +46,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 
 
@@ -163,9 +161,24 @@ public class SparkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // spark rdd operation
-    result = interpreter.interpret("sc\n.range(1, 10)\n.sum", getInterpreterContext());
+    context = getInterpreterContext();
+    context.setParagraphId("pid_1");
+    result = interpreter.interpret("sc\n.range(1, 10)\n.sum", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(output.contains("45"));
+    ArgumentCaptor<Map> captorEvent = ArgumentCaptor.forClass(Map.class);
+    verify(mockRemoteEventClient).onParaInfosReceived(captorEvent.capture());
+    assertEquals("pid_1", captorEvent.getValue().get("paraId"));
+
+    reset(mockRemoteEventClient);
+    context = getInterpreterContext();
+    context.setParagraphId("pid_2");
+    result = interpreter.interpret("sc\n.range(1, 10)\n.sum", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(output.contains("45"));
+    captorEvent = ArgumentCaptor.forClass(Map.class);
+    verify(mockRemoteEventClient).onParaInfosReceived(captorEvent.capture());
+    assertEquals("pid_2", captorEvent.getValue().get("paraId"));
 
     // spark job url is sent
     verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
@@ -325,7 +338,7 @@ public class SparkInterpreterTest {
         InterpreterResult result = null;
         try {
           result = interpreter.interpret(
-              "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context2);
+              "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))", context2);
         } catch (InterpreterException e) {
           e.printStackTrace();
         }
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 48d0055..1b4dd99 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -129,7 +129,9 @@ public class SparkShimsTest {
 
     @Test
     public void runUnderLocalTest() {
-      sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockContext);
+      Properties properties = new Properties();
+      properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
+      sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext);
 
       Map<String, String> mapValue = argumentCaptor.getValue();
       assertTrue(mapValue.keySet().contains("jobUrl"));
@@ -138,8 +140,9 @@ public class SparkShimsTest {
 
     @Test
     public void runUnderYarnTest() {
-
-      sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockContext);
+      Properties properties = new Properties();
+      properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
+      sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext);
 
       Map<String, String> mapValue = argumentCaptor.getValue();
       assertTrue(mapValue.keySet().contains("jobUrl"));
diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index 10fb9d6..1482e38 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -96,37 +96,44 @@ public abstract class SparkShims {
   public abstract String showDataFrame(Object obj, int maxResult);
 
 
-  protected String getNoteId(String jobgroupId) {
-    int indexOf = jobgroupId.indexOf("-");
-    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
-    return jobgroupId.substring(indexOf + 1, secondIndex);
-  }
-
-  protected String getParagraphId(String jobgroupId) {
-    int indexOf = jobgroupId.indexOf("-");
-    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
-    return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
-  }
-
   protected void buildSparkJobUrl(String master,
                                   String sparkWebUrl,
                                   int jobId,
+                                  Properties jobProperties,
                                   InterpreterContext context) {
     String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
     String version = VersionInfo.getVersion();
     if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
       jobUrl = sparkWebUrl + "/jobs";
     }
+    String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
 
     Map<String, String> infos = new java.util.HashMap<String, String>();
     infos.put("jobUrl", jobUrl);
     infos.put("label", "SPARK JOB");
     infos.put("tooltip", "View in Spark web UI");
-    infos.put("noteId", context.getNoteId());
-    infos.put("paraId", context.getParagraphId());
+    infos.put("noteId", getNoteId(jobGroupId));
+    infos.put("paraId", getParagraphId(jobGroupId));
+    LOGGER.debug("Send spark job url: " + infos);
     context.getIntpEventClient().onParaInfosReceived(infos);
   }
 
+  public static String getNoteId(String jobGroupId) {
+    String[] tokens = jobGroupId.split("\\|");
+    if (tokens.length != 4) {
+      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+    }
+    return tokens[2];
+  }
+
+  public static String getParagraphId(String jobGroupId) {
+    String[] tokens = jobGroupId.split("\\|");
+    if (tokens.length != 4) {
+      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+    }
+    return tokens[3];
+  }
+
   /**
    * This is temporal patch for support old versions of Yarn which is not adopted YARN-6615
    *
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index 786d68c..6c86925 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -45,7 +45,7 @@ public class Spark1Shims extends SparkShims {
       public void onJobStart(SparkListenerJobStart jobStart) {
         if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
             !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) {
-          buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
+          buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
         }
       }
     });
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 3ecadaa..041ed01 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -46,7 +46,7 @@ public class Spark2Shims extends SparkShims {
 
         if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
             !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) {
-          buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
+          buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
         }
       }
     });
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 4e0a8df..2b56971 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -189,6 +189,10 @@ public class InterpreterContext {
     return paragraphId;
   }
 
+  public void setParagraphId(String paragraphId) {
+    this.paragraphId = paragraphId;
+  }
+
   public String getParagraphText() {
     return paragraphText;
   }