You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/07/05 09:29:06 UTC
[zeppelin] branch master updated: [ZEPPELIN-4214]. Spark Web UI is
displayed in the wrong paragraph
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 786c4af [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
786c4af is described below
commit 786c4afbf8076c19683b628b3f29751817bdf4dc
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 1 14:43:51 2019 +0800
[ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
### What is this PR for?
This PR fix the issue that spark job url is displayed in the wrong paragraph. Unit test is added in SparkInterpreterTest.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://jira.apache.org/jira/browse/ZEPPELIN-4214
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
Before
![image](https://user-images.githubusercontent.com/164491/60497812-221a3780-9ce8-11e9-9771-28b512002308.png)
After
![image](https://user-images.githubusercontent.com/164491/60521296-94574000-9d19-11e9-87a5-6b4cc80dfcc2.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3393 from zjffdu/ZEPPELIN-4214 and squashes the following commits:
28ed16cfe [Jeff Zhang] [ZEPPELIN-4214]. Spark Web UI is displayed in the wrong paragraph
---
.../main/java/org/apache/zeppelin/spark/Utils.java | 14 +--------
.../zeppelin/spark/SparkInterpreterTest.java | 25 ++++++++++++----
.../org/apache/zeppelin/spark/SparkShimsTest.java | 9 ++++--
.../org/apache/zeppelin/spark/SparkShims.java | 35 +++++++++++++---------
.../org/apache/zeppelin/spark/Spark1Shims.java | 2 +-
.../org/apache/zeppelin/spark/Spark2Shims.java | 2 +-
.../zeppelin/interpreter/InterpreterContext.java | 4 +++
7 files changed, 53 insertions(+), 38 deletions(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 723a983..381d023 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -119,25 +119,13 @@ class Utils {
if (context.getAuthenticationInfo() != null) {
uName = getUserName(context.getAuthenticationInfo());
}
- return "zeppelin-" + uName + "-" + context.getNoteId() + "-" + context.getParagraphId();
+ return "zeppelin|" + uName + "|" + context.getNoteId() + "|" + context.getParagraphId();
}
public static String buildJobDesc(InterpreterContext context) {
return "Started by: " + getUserName(context.getAuthenticationInfo());
}
- public static String getNoteId(String jobgroupId) {
- int indexOf = jobgroupId.indexOf("-");
- int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
- return jobgroupId.substring(indexOf + 1, secondIndex);
- }
-
- public static String getParagraphId(String jobgroupId) {
- int indexOf = jobgroupId.indexOf("-");
- int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
- return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
- }
-
public static String getUserName(AuthenticationInfo info) {
String uName = "";
if (info != null) {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 34b24c7..bceda3a 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -17,13 +17,11 @@
package org.apache.zeppelin.spark;
-import com.google.common.io.Files;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Password;
import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
-import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -35,11 +33,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -49,6 +46,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
@@ -163,9 +161,24 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// spark rdd operation
- result = interpreter.interpret("sc\n.range(1, 10)\n.sum", getInterpreterContext());
+ context = getInterpreterContext();
+ context.setParagraphId("pid_1");
+ result = interpreter.interpret("sc\n.range(1, 10)\n.sum", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(output.contains("45"));
+ ArgumentCaptor<Map> captorEvent = ArgumentCaptor.forClass(Map.class);
+ verify(mockRemoteEventClient).onParaInfosReceived(captorEvent.capture());
+ assertEquals("pid_1", captorEvent.getValue().get("paraId"));
+
+ reset(mockRemoteEventClient);
+ context = getInterpreterContext();
+ context.setParagraphId("pid_2");
+ result = interpreter.interpret("sc\n.range(1, 10)\n.sum", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(output.contains("45"));
+ captorEvent = ArgumentCaptor.forClass(Map.class);
+ verify(mockRemoteEventClient).onParaInfosReceived(captorEvent.capture());
+ assertEquals("pid_2", captorEvent.getValue().get("paraId"));
// spark job url is sent
verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
@@ -325,7 +338,7 @@ public class SparkInterpreterTest {
InterpreterResult result = null;
try {
result = interpreter.interpret(
- "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context2);
+ "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))", context2);
} catch (InterpreterException e) {
e.printStackTrace();
}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 48d0055..1b4dd99 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -129,7 +129,9 @@ public class SparkShimsTest {
@Test
public void runUnderLocalTest() {
- sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockContext);
+ Properties properties = new Properties();
+ properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
+ sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
@@ -138,8 +140,9 @@ public class SparkShimsTest {
@Test
public void runUnderYarnTest() {
-
- sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockContext);
+ Properties properties = new Properties();
+ properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
+ sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index 10fb9d6..1482e38 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -96,37 +96,44 @@ public abstract class SparkShims {
public abstract String showDataFrame(Object obj, int maxResult);
- protected String getNoteId(String jobgroupId) {
- int indexOf = jobgroupId.indexOf("-");
- int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
- return jobgroupId.substring(indexOf + 1, secondIndex);
- }
-
- protected String getParagraphId(String jobgroupId) {
- int indexOf = jobgroupId.indexOf("-");
- int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
- return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
- }
-
protected void buildSparkJobUrl(String master,
String sparkWebUrl,
int jobId,
+ Properties jobProperties,
InterpreterContext context) {
String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
String version = VersionInfo.getVersion();
if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
jobUrl = sparkWebUrl + "/jobs";
}
+ String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
Map<String, String> infos = new java.util.HashMap<String, String>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
- infos.put("noteId", context.getNoteId());
- infos.put("paraId", context.getParagraphId());
+ infos.put("noteId", getNoteId(jobGroupId));
+ infos.put("paraId", getParagraphId(jobGroupId));
+ LOGGER.debug("Send spark job url: " + infos);
context.getIntpEventClient().onParaInfosReceived(infos);
}
+ public static String getNoteId(String jobGroupId) {
+ String[] tokens = jobGroupId.split("\\|");
+ if (tokens.length != 4) {
+ throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+ }
+ return tokens[2];
+ }
+
+ public static String getParagraphId(String jobGroupId) {
+ String[] tokens = jobGroupId.split("\\|");
+ if (tokens.length != 4) {
+ throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+ }
+ return tokens[3];
+ }
+
/**
* This is temporal patch for support old versions of Yarn which is not adopted YARN-6615
*
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index 786d68c..6c86925 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -45,7 +45,7 @@ public class Spark1Shims extends SparkShims {
public void onJobStart(SparkListenerJobStart jobStart) {
if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
!Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) {
- buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
+ buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
}
}
});
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 3ecadaa..041ed01 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -46,7 +46,7 @@ public class Spark2Shims extends SparkShims {
if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
!Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) {
- buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), context);
+ buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
}
}
});
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 4e0a8df..2b56971 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -189,6 +189,10 @@ public class InterpreterContext {
return paragraphId;
}
+ public void setParagraphId(String paragraphId) {
+ this.paragraphId = paragraphId;
+ }
+
public String getParagraphText() {
return paragraphText;
}