You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/28 02:27:15 UTC
zeppelin git commit: ZEPPELIN-3242. Listener threw an exception
java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)
Repository: zeppelin
Updated Branches:
refs/heads/master 64bbba479 -> 500b74b19
ZEPPELIN-3242. Listener threw an exception java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)
### What is this PR for?
This issue also cause spark url can not be displayed in frontend. The root cause is that PySparkInterpreter/IPySparkInterpreter doesn't set JobGroup correctly.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3242
### How should this be tested?
* CI pass and also manually verified.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? NO
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2822 from zjffdu/ZEPPELIN-3242 and squashes the following commits:
8254162 [Jeff Zhang] ZEPPELIN-3242. Listener threw an exception java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/500b74b1
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/500b74b1
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/500b74b1
Branch: refs/heads/master
Commit: 500b74b196b740c810553c43216a56e23ab9caf0
Parents: 64bbba4
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Feb 27 20:53:54 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 28 10:27:08 2018 +0800
----------------------------------------------------------------------
.../zeppelin/spark/IPySparkInterpreter.java | 11 +++++++
.../zeppelin/spark/PySparkInterpreter.java | 13 +++++---
.../zeppelin/spark/IPySparkInterpreterTest.java | 33 +++++++++++++-------
.../zeppelin/spark/NewSparkInterpreterTest.java | 17 ++++++++--
.../zeppelin/spark/SparkRInterpreterTest.java | 19 +++++++++--
.../interpreter/InterpreterContext.java | 4 +++
6 files changed, 77 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index a75fda8..3691156 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -23,6 +23,7 @@ import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.python.IPythonInterpreter;
@@ -99,6 +100,16 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ InterpreterContext.set(context);
+ sparkInterpreter.populateSparkWebUrl(context);
+ String jobGroupId = Utils.buildJobGroupId(context);
+ String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+ String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
+ return super.interpret(setJobGroupStmt +"\n" + st, context);
+ }
+
+ @Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
sparkInterpreter.cancel(context);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 0703ad7..f5e4793 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -406,16 +406,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
+ if (iPySparkInterpreter != null) {
+ return iPySparkInterpreter.interpret(st, context);
+ }
+
SparkInterpreter sparkInterpreter = getSparkInterpreter();
- sparkInterpreter.populateSparkWebUrl(context);
if (sparkInterpreter.isUnsupportedSparkVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
-
- if (iPySparkInterpreter != null) {
- return iPySparkInterpreter.interpret(st, context);
- }
+ sparkInterpreter.populateSparkWebUrl(context);
if (!pythonscriptRunning) {
return new InterpreterResult(Code.ERROR, "python process not running"
@@ -467,10 +467,13 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
String jobGroup = Utils.buildJobGroupId(context);
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+
SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
__zeppelin__.setInterpreterContext(context);
__zeppelin__.setGui(context.getGui());
__zeppelin__.setNoteGui(context.getNoteGui());
+ InterpreterContext.set(context);
+
pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc);
statementOutput = null;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 5eaa42c..46a3a72 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.python.IPythonInterpreterTest;
import org.apache.zeppelin.user.AuthenticationInfo;
@@ -39,15 +40,21 @@ import java.net.URL;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class IPySparkInterpreterTest {
private IPySparkInterpreter iPySparkInterpreter;
private InterpreterGroup intpGroup;
+ private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
@Before
public void setup() throws InterpreterException {
@@ -69,11 +76,13 @@ public class IPySparkInterpreterTest {
intpGroup.get("session_1").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
sparkInterpreter.open();
+ sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
iPySparkInterpreter = new IPySparkInterpreter(p);
intpGroup.get("session_1").add(iPySparkInterpreter);
iPySparkInterpreter.setInterpreterGroup(intpGroup);
iPySparkInterpreter.open();
+ sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
}
@@ -91,17 +100,21 @@ public class IPySparkInterpreterTest {
// rdd
InterpreterContext context = getInterpreterContext();
- InterpreterResult result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context);
+ InterpreterResult result = iPySparkInterpreter.interpret("sc.version", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
- assertEquals("45", interpreterResultMessages.get(0).getData());
+ // spark url is sent
+ verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
context = getInterpreterContext();
- result = iPySparkInterpreter.interpret("sc.version", context);
+ result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- interpreterResultMessages = context.out.getInterpreterResultMessages();
+ List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
+ assertEquals("45", interpreterResultMessages.get(0).getData());
+ // spark job url is sent
+ verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+
// spark sql
context = getInterpreterContext();
if (interpreterResultMessages.get(0).getData().startsWith("'1.") ||
@@ -146,7 +159,6 @@ public class IPySparkInterpreterTest {
"1 a\n" +
"2 b\n", interpreterResultMessages.get(0).getData());
}
-
// cancel
final InterpreterContext context2 = getInterpreterContext();
@@ -166,6 +178,7 @@ public class IPySparkInterpreterTest {
};
thread.start();
+
// sleep 1 second to wait for the spark job starts
Thread.sleep(1000);
iPySparkInterpreter.cancel(context);
@@ -177,10 +190,6 @@ public class IPySparkInterpreterTest {
assertEquals("range", completions.get(0).getValue());
// pyspark streaming
-
- Class klass = py4j.GatewayServer.class;
- URL location = klass.getResource('/' + klass.getName().replace('.', '/') + ".class");
- System.out.println("py4j location: " + location);
context = getInterpreterContext();
result = iPySparkInterpreter.interpret(
"from pyspark.streaming import StreamingContext\n" +
@@ -204,7 +213,7 @@ public class IPySparkInterpreterTest {
}
private InterpreterContext getInterpreterContext() {
- return new InterpreterContext(
+ InterpreterContext context = new InterpreterContext(
"noteId",
"paragraphId",
"replName",
@@ -218,5 +227,7 @@ public class IPySparkInterpreterTest {
null,
null,
new InterpreterOutput(null));
+ context.setClient(mockRemoteEventClient);
+ return context;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index cfcf2a5..3d22af3 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -29,6 +29,8 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
@@ -42,12 +44,14 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class NewSparkInterpreterTest {
@@ -59,6 +63,8 @@ public class NewSparkInterpreterTest {
// catch the interpreter output in onUpdate
private InterpreterResultMessageOutput messageOutput;
+ private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+
@Test
public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
Properties properties = new Properties();
@@ -72,9 +78,12 @@ public class NewSparkInterpreterTest {
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
interpreter.open();
+ interpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a: String = hello world\n", output);
+ // spark web url is sent
+ verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
result = interpreter.interpret("print(a)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -124,6 +133,8 @@ public class NewSparkInterpreterTest {
result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(output.contains("45"));
+ // spark job url is sent
+ verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
// case class
result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
@@ -349,7 +360,7 @@ public class NewSparkInterpreterTest {
private InterpreterContext getInterpreterContext() {
output = "";
- return new InterpreterContext(
+ InterpreterContext context = new InterpreterContext(
"noteId",
"paragraphId",
"replName",
@@ -385,5 +396,7 @@ public class NewSparkInterpreterTest {
}
})
);
+ context.setClient(mockRemoteEventClient);
+ return context;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 2d585f5..0bd88d4 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -24,21 +24,27 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class SparkRInterpreterTest {
private SparkRInterpreter sparkRInterpreter;
private SparkInterpreter sparkInterpreter;
-
+ private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
@Test
public void testSparkRInterpreter() throws IOException, InterruptedException, InterpreterException {
@@ -60,10 +66,13 @@ public class SparkRInterpreterTest {
sparkInterpreter.setInterpreterGroup(interpreterGroup);
sparkRInterpreter.open();
+ sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("2"));
+ // spark web url is sent
+ verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -72,16 +81,20 @@ public class SparkRInterpreterTest {
result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ // spark job url is sent
+ verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
} else {
// spark 1.x
result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ // spark job url is sent
+ verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
}
}
private InterpreterContext getInterpreterContext() {
- return new InterpreterContext(
+ InterpreterContext context = new InterpreterContext(
"noteId",
"paragraphId",
"replName",
@@ -95,5 +108,7 @@ public class SparkRInterpreterTest {
null,
null,
null);
+ context.setClient(mockRemoteEventClient);
+ return context;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 293f9bf..8fa0904 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -226,6 +226,10 @@ public class InterpreterContext {
return client;
}
+ public void setClient(RemoteEventClientWrapper client) {
+ this.client = client;
+ }
+
public RemoteWorksController getRemoteWorksController() {
return remoteWorksController;
}