You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:10:07 UTC
[12/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC
framework between ZeppelinServer and InterpreterProcess on top of thrift
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index e177d49..5e77c58 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -17,17 +17,10 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
@@ -35,6 +28,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
public class DepInterpreterTest {
@Rule
@@ -63,11 +62,8 @@ public class DepInterpreterTest {
intpGroup.get("note").add(dep);
dep.setInterpreterGroup(intpGroup);
- context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new HashMap<String, Object>(), new GUI(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- null,
- new LinkedList<InterpreterContextRunner>(), null);
+ context = InterpreterContext.builder()
+ .build();;
}
@After
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index ece5235..8625335 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.spark;
import com.google.common.io.Files;
-import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -28,15 +27,13 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.python.IPythonInterpreterTest;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -51,7 +48,7 @@ import static org.mockito.Mockito.verify;
public class IPySparkInterpreterTest extends IPythonInterpreterTest {
private InterpreterGroup intpGroup;
- private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+ private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
@Override
protected Properties initIntpProperties() {
@@ -70,13 +67,17 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
return p;
}
+
@Override
protected void startInterpreter(Properties properties) throws InterpreterException {
- intpGroup = new InterpreterGroup();
- intpGroup.put("session_1", new ArrayList<Interpreter>());
+ InterpreterContext context = getInterpreterContext();
+ context.setIntpEventClient(mockIntpEventClient);
+ InterpreterContext.set(context);
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
new SparkInterpreter(properties));
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("session_1", new ArrayList<Interpreter>());
intpGroup.get("session_1").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
@@ -102,32 +103,32 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
@Test
public void testIPySpark() throws InterruptedException, InterpreterException, IOException {
- testPySpark(interpreter, mockRemoteEventClient);
+ testPySpark(interpreter, mockIntpEventClient);
}
- public static void testPySpark(final Interpreter interpreter, RemoteEventClient mockRemoteEventClient)
+ public static void testPySpark(final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient)
throws InterpreterException, IOException, InterruptedException {
- reset(mockRemoteEventClient);
+ reset(mockIntpEventClient);
// rdd
- InterpreterContext context = createInterpreterContext(mockRemoteEventClient);
+ InterpreterContext context = createInterpreterContext(mockIntpEventClient);
InterpreterResult result = interpreter.interpret("sc.version", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
String sparkVersion = context.out.toInterpreterResultMessage().get(0).getData();
// spark url is sent
- verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
+ verify(mockIntpEventClient).onMetaInfosReceived(any(Map.class));
- context = createInterpreterContext(mockRemoteEventClient);
+ context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("sc.range(1,10).sum()", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals("45", interpreterResultMessages.get(0).getData().trim());
// spark job url is sent
-// verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+ verify(mockIntpEventClient).onParaInfosReceived(any(Map.class));
// spark sql
- context = createInterpreterContext(mockRemoteEventClient);
+ context = createInterpreterContext(mockIntpEventClient);
if (!isSpark2(sparkVersion)) {
result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -140,7 +141,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
"| 2| b|\n" +
"+---+---+", interpreterResultMessages.get(0).getData().trim());
- context = createInterpreterContext(mockRemoteEventClient);
+ context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -160,7 +161,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
"| 2| b|\n" +
"+---+---+", interpreterResultMessages.get(0).getData().trim());
- context = createInterpreterContext(mockRemoteEventClient);
+ context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -171,7 +172,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
}
// cancel
if (interpreter instanceof IPySparkInterpreter) {
- final InterpreterContext context2 = createInterpreterContext(mockRemoteEventClient);
+ final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient);
Thread thread = new Thread() {
@Override
@@ -201,24 +202,24 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
}
// completions
- List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockRemoteEventClient));
+ List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient));
assertEquals(1, completions.size());
assertEquals("range", completions.get(0).getValue());
- completions = interpreter.completion("sc.", 3, createInterpreterContext(mockRemoteEventClient));
+ completions = interpreter.completion("sc.", 3, createInterpreterContext(mockIntpEventClient));
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("range", "range", ""));
- completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockRemoteEventClient));
+ completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient));
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("range", "range", ""));
- completions = interpreter.completion("s", 1, createInterpreterContext(mockRemoteEventClient));
+ completions = interpreter.completion("s", 1, createInterpreterContext(mockIntpEventClient));
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("sc", "sc", ""));
// pyspark streaming
- context = createInterpreterContext(mockRemoteEventClient);
+ context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret(
"from pyspark.streaming import StreamingContext\n" +
"import time\n" +
@@ -244,22 +245,12 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
}
- private static InterpreterContext createInterpreterContext(RemoteEventClient mockRemoteEventClient) {
- InterpreterContext context = new InterpreterContext(
- "noteId",
- "paragraphId",
- "replName",
- "paragraphTitle",
- "paragraphText",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- null,
- null,
- null,
- new InterpreterOutput(null));
- context.setClient(mockRemoteEventClient);
- return context;
+ private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) {
+ return InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setIntpEventClient(mockRemoteEventClient)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index f6cb9a9..84bdc43 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.spark;
import com.google.common.io.Files;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
@@ -31,9 +30,8 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@@ -67,7 +65,7 @@ public class NewSparkInterpreterTest {
// catch the interpreter output in onUpdate
private InterpreterResultMessageOutput messageOutput;
- private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
@Test
public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
@@ -77,12 +75,19 @@ public class NewSparkInterpreterTest {
properties.setProperty("zeppelin.spark.maxResult", "100");
properties.setProperty("zeppelin.spark.test", "true");
properties.setProperty("zeppelin.spark.useNew", "true");
+
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
+ InterpreterContext.set(context);
+
interpreter = new SparkInterpreter(properties);
assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
interpreter.open();
- interpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a: String = hello world\n", output);
@@ -138,7 +143,7 @@ public class NewSparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(output.contains("45"));
// spark job url is sent
- verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+ verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
// case class
result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
@@ -203,7 +208,7 @@ public class NewSparkInterpreterTest {
messageOutput.flush();
assertEquals("_1\t_2\n1\ta\n2\tb\n", messageOutput.toInterpreterResultMessage().getData());
- InterpreterContext context = getInterpreterContext();
+ context = getInterpreterContext();
result = interpreter.interpret("z.input(\"name\", \"default_name\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
@@ -397,19 +402,12 @@ public class NewSparkInterpreterTest {
private InterpreterContext getInterpreterContext() {
output = "";
- InterpreterContext context = new InterpreterContext(
- "noteId",
- "paragraphId",
- "replName",
- "paragraphTitle",
- "paragraphText",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry("spark", null),
- null,
- null,
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
+ context.out =
new InterpreterOutput(
new InterpreterOutputListener() {
@@ -431,9 +429,7 @@ public class NewSparkInterpreterTest {
public void onUpdate(int index, InterpreterResultMessageOutput out) {
messageOutput = out;
}
- })
- );
- context.setClient(mockRemoteEventClient);
+ });
return context;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 42289ff..300388d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -17,21 +17,26 @@
package org.apache.zeppelin.spark;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import com.google.common.io.Files;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.junit.*;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public class NewSparkSqlInterpreterTest {
@@ -62,11 +67,15 @@ public class NewSparkSqlInterpreterTest {
sparkInterpreter.open();
sqlInterpreter.open();
- context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new HashMap<String, Object>(), new GUI(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
index 3a98653..12fe614 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
@@ -18,22 +18,19 @@
package org.apache.zeppelin.spark;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
@@ -55,6 +52,7 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class OldSparkInterpreterTest {
@@ -66,8 +64,6 @@ public class OldSparkInterpreterTest {
static InterpreterGroup intpGroup;
static InterpreterContext context;
static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class);
- static Map<String, Map<String, String>> paraIdToInfosMap =
- new HashMap<>();
/**
* Get spark version number as a numerical value.
@@ -98,41 +94,22 @@ public class OldSparkInterpreterTest {
@BeforeClass
public static void setUp() throws Exception {
intpGroup = new InterpreterGroup();
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
+ InterpreterContext.set(context);
+
intpGroup.put("note", new LinkedList<Interpreter>());
repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
repl.setInterpreterGroup(intpGroup);
intpGroup.get("note").add(repl);
repl.open();
-
- final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() {
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId,
- Map<String, String> infos) {
- if (infos != null) {
- paraIdToInfosMap.put(paragraphId, infos);
- }
- }
-
- @Override
- public void onMetaInfosReceived(Map<String, String> infos) {
- }
- };
- context = new InterpreterContext("note", "id", null, "title", "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(),
- new InterpreterOutput(null)) {
-
- @Override
- public RemoteEventClientWrapper getClient() {
- return remoteEventClientWrapper;
- }
- };
// The first para interpretdr will set the Eventclient wrapper
//SparkInterpreter.interpret(String, InterpreterContext) ->
//SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
@@ -336,27 +313,4 @@ public class OldSparkInterpreterTest {
assertTrue(completions.size() > 0);
}
- @Test
- public void testParagraphUrls() throws InterpreterException {
- String paraId = "test_para_job_url";
- InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(),
- new InterpreterOutput(null));
- repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx);
- Map<String, String> paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId());
- String jobUrl = null;
- if (paraInfos != null) {
- jobUrl = paraInfos.get("jobUrl");
- }
- String sparkUIUrl = repl.getSparkUIUrl();
- assertNotNull(jobUrl);
- assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id="));
-
- }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
index d0b0874..fa1e257 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
@@ -18,29 +18,27 @@
package org.apache.zeppelin.spark;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public class OldSparkSqlInterpreterTest {
@@ -76,11 +74,15 @@ public class OldSparkSqlInterpreterTest {
sql.setInterpreterGroup(intpGroup);
sql.open();
- context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new HashMap<String, Object>(), new GUI(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index 2d40871..d3f5867 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -18,23 +18,34 @@
package org.apache.zeppelin.spark;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.*;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.IOException;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PySparkInterpreterMatplotlibTest {
@@ -111,15 +122,12 @@ public class PySparkInterpreterMatplotlibTest {
public static void setUp() throws Exception {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
- context = new InterpreterContext("note", "id", null, "title", "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(),
- new InterpreterOutput(null));
+ context = InterpreterContext.builder()
+ .setNoteId("note")
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .build();
InterpreterContext.set(context);
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
@@ -131,16 +139,6 @@ public class PySparkInterpreterMatplotlibTest {
intpGroup.get("note").add(pyspark);
pyspark.setInterpreterGroup(intpGroup);
pyspark.open();
-
- context = new InterpreterContext("note", "id", null, "title", "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("id"),
- new LinkedList<InterpreterContextRunner>(),
- new InterpreterOutput(null));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 446f183..64f1ff5 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -17,13 +17,15 @@
package org.apache.zeppelin.spark;
+
import com.google.common.io.Files;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreterTest;
import org.junit.Test;
@@ -33,9 +35,10 @@ import java.util.Properties;
import static org.mockito.Mockito.mock;
+
public class PySparkInterpreterTest extends PythonInterpreterTest {
- private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
@Override
public void setUp() throws InterpreterException {
@@ -52,13 +55,18 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
properties.setProperty("zeppelin.spark.test", "true");
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
- InterpreterContext.set(getInterpreterContext(mockRemoteEventClient));
// create interpreter group
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .build();
+ InterpreterContext.set(context);
LazyOpenInterpreter sparkInterpreter =
new LazyOpenInterpreter(new SparkInterpreter(properties));
+
intpGroup.get("note").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
@@ -86,4 +94,10 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
IPySparkInterpreterTest.testPySpark(interpreter, mockRemoteEventClient);
}
+ @Override
+ protected InterpreterContext getInterpreterContext() {
+ InterpreterContext context = super.getInterpreterContext();
+ context.setIntpEventClient(mockRemoteEventClient);
+ return context;
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 8eaf1e4..8f9f080 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -17,21 +17,16 @@
package org.apache.zeppelin.spark;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
-import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -47,7 +42,7 @@ public class SparkRInterpreterTest {
private SparkRInterpreter sparkRInterpreter;
private SparkInterpreter sparkInterpreter;
- private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteIntpEventClient = mock(RemoteInterpreterEventClient.class);
@Before
public void setUp() throws InterpreterException {
@@ -60,6 +55,8 @@ public class SparkRInterpreterTest {
properties.setProperty("zeppelin.R.knitr", "true");
properties.setProperty("spark.r.backendConnectionTimeout", "10");
+ InterpreterContext context = getInterpreterContext();
+ InterpreterContext.set(context);
sparkRInterpreter = new SparkRInterpreter(properties);
sparkInterpreter = new SparkInterpreter(properties);
@@ -70,7 +67,6 @@ public class SparkRInterpreterTest {
sparkInterpreter.setInterpreterGroup(interpreterGroup);
sparkRInterpreter.open();
- sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
}
@After
@@ -86,7 +82,7 @@ public class SparkRInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("2"));
// spark web url is sent
- verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
+ verify(mockRemoteIntpEventClient).onMetaInfosReceived(any(Map.class));
result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -96,7 +92,7 @@ public class SparkRInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
// spark job url is sent
- verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+ verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
// cancel
final InterpreterContext context = getInterpreterContext();
@@ -127,7 +123,7 @@ public class SparkRInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
// spark job url is sent
- verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+ verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
}
// plotting
@@ -155,8 +151,9 @@ public class SparkRInterpreterTest {
InterpreterContext context = InterpreterContext.builder()
.setNoteId("note_1")
.setParagraphId("paragraph_1")
- .setEventClient(mockRemoteEventClient)
+ .setIntpEventClient(mockRemoteIntpEventClient)
.build();
return context;
}
}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 25afd4e..4fa74e9 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -31,7 +31,9 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.util.VersionInfo;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
@@ -92,7 +94,9 @@ public class SparkShimsTest {
SparkShims sparkShims =
new SparkShims() {
@Override
- public void setupSparkListener(String master, String sparkWebUrl) {}
+ public void setupSparkListener(String master,
+ String sparkWebUrl,
+ InterpreterContext context) {}
};
assertEquals(expected, sparkShims.supportYarn6615(version));
}
@@ -106,19 +110,15 @@ public class SparkShimsTest {
@Captor ArgumentCaptor<Map<String, String>> argumentCaptor;
SparkShims sparkShims;
+ InterpreterContext mockContext;
+ RemoteInterpreterEventClient mockIntpEventClient;
@Before
public void setUp() {
- PowerMockito.mockStatic(BaseZeppelinContext.class);
- RemoteEventClientWrapper mockRemoteEventClientWrapper = mock(RemoteEventClientWrapper.class);
-
- when(BaseZeppelinContext.getEventClient()).thenReturn(mockRemoteEventClientWrapper);
- doNothing()
- .when(mockRemoteEventClientWrapper)
- .onParaInfosReceived(anyString(), anyString(), argumentCaptor.capture());
-
- when(mockProperties.getProperty("spark.jobGroup.id")).thenReturn("job-note-paragraph");
-
+ mockContext = mock(InterpreterContext.class);
+ mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
+ when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
+ doNothing().when(mockIntpEventClient).onParaInfosReceived(argumentCaptor.capture());
try {
sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString());
} catch (Throwable ignore) {
@@ -127,8 +127,8 @@ public class SparkShimsTest {
}
@Test
- public void runUnerLocalTest() {
- sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties);
+ public void runUnderLocalTest() {
+ sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties, mockContext);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
@@ -136,9 +136,9 @@ public class SparkShimsTest {
}
@Test
- public void runUnerYarnTest() {
+ public void runUnderYarnTest() {
- sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties);
+ sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties, mockContext);
Map<String, String> mapValue = argumentCaptor.getValue();
assertTrue(mapValue.keySet().contains("jobUrl"));
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
----------------------------------------------------------------------
diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index 1d7323b..3c452cc 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -20,7 +20,9 @@ package org.apache.zeppelin.spark;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +84,9 @@ public abstract class SparkShims {
* This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in
* spark1 while it is abstract class in spark2.
*/
- public abstract void setupSparkListener(String master, String sparkWebUrl);
+ public abstract void setupSparkListener(String master,
+ String sparkWebUrl,
+ InterpreterContext context);
protected String getNoteId(String jobgroupId) {
int indexOf = jobgroupId.indexOf("-");
@@ -96,30 +100,26 @@ public abstract class SparkShims {
return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
}
- protected void buildSparkJobUrl(
- String master, String sparkWebUrl, int jobId, Properties jobProperties) {
- String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
+ protected void buildSparkJobUrl(String master,
+ String sparkWebUrl,
+ int jobId,
+ Properties jobProperties,
+ InterpreterContext context) {
String uiEnabled = jobProperties.getProperty("spark.ui.enabled");
String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
-
+ // Button visible if Spark UI property not set, set as invalid boolean or true
+ boolean showSparkUI =
+ uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
String version = VersionInfo.getVersion();
if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
jobUrl = sparkWebUrl + "/jobs";
}
-
- String noteId = getNoteId(jobGroupId);
- String paragraphId = getParagraphId(jobGroupId);
- // Button visible if Spark UI property not set, set as invalid boolean or true
- boolean showSparkUI = uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
- if (showSparkUI) {
- RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
- Map<String, String> infos = new java.util.HashMap<>();
+ if (showSparkUI && jobUrl != null) {
+ Map<String, String> infos = new java.util.HashMap<String, String>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
- if (eventClient != null) {
- eventClient.onParaInfosReceived(noteId, paragraphId, infos);
- }
+ context.getIntpEventClient().onParaInfosReceived(infos);
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
----------------------------------------------------------------------
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index d05a25f..7c922aa 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -21,15 +21,18 @@ package org.apache.zeppelin.spark;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.zeppelin.interpreter.InterpreterContext;
public class Spark1Shims extends SparkShims {
- public void setupSparkListener(final String master, final String sparkWebUrl) {
+ public void setupSparkListener(final String master,
+ final String sparkWebUrl,
+ final InterpreterContext context) {
SparkContext sc = SparkContext.getOrCreate();
sc.addSparkListener(new JobProgressListener(sc.getConf()) {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
- buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
+ buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
}
});
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
----------------------------------------------------------------------
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 6fd29e3..63bd688 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -21,15 +21,18 @@ package org.apache.zeppelin.spark;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.zeppelin.interpreter.InterpreterContext;
public class Spark2Shims extends SparkShims {
- public void setupSparkListener(final String master, final String sparkWebUrl) {
+ public void setupSparkListener(final String master,
+ final String sparkWebUrl,
+ final InterpreterContext context) {
SparkContext sc = SparkContext.getOrCreate();
sc.addSparkListener(new SparkListener() {
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
- buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
+ buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
}
});
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
index 4ddae9a..0567647 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
@@ -34,12 +34,11 @@ trait AbstractAngularElemTest
override def beforeEach() {
val intpGroup = new InterpreterGroup()
- val context = new InterpreterContext("note", "paragraph", null, "title", "text",
- new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- null,
- new util.LinkedList[InterpreterContextRunner](),
- new InterpreterOutput(null));
+ val context = InterpreterContext.builder
+ .setNoteId("noteId")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .build()
InterpreterContext.set(context)
super.beforeEach() // To be stackable, must call super.beforeEach
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
index c9b0d8f..477e249 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
@@ -29,12 +29,11 @@ trait AbstractAngularModelTest extends FlatSpec
with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
override def beforeEach() {
val intpGroup = new InterpreterGroup()
- val context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new java.util.HashMap[String, Object](), new GUI(), new GUI(), new AngularObjectRegistry(
- intpGroup.getId(), null),
- null,
- new java.util.LinkedList[InterpreterContextRunner](),
- new InterpreterOutput(null));
+ val context = InterpreterContext.builder
+ .setNoteId("noteId")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .build()
InterpreterContext.set(context)
super.beforeEach() // To be stackable, must call super.beforeEach
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 83d8e23..dc46dd0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -501,12 +501,12 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
}
- public String getCallbackPortRange() {
- return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
+ public String getZeppelinServerRPCPortRange() {
+ return getString(ConfVars.ZEPPELIN_SERVER_RPC_PORTRANGE);
}
public String getInterpreterPortRange() {
- return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
+ return getString(ConfVars.ZEPPELIN_INTERPRETER_RPC_PORTRANGE);
}
public boolean isWindowsPath(String path){
@@ -801,8 +801,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
- ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
- ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"),
+ ZEPPELIN_SERVER_RPC_PORTRANGE("zeppelin.server.rpc.portRange", ":"),
+ ZEPPELIN_INTERPRETER_RPC_PORTRANGE("zeppelin.interpreter.rpc.portRange", ":"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager"),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
index 2187705..1959e3d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
@@ -17,17 +17,17 @@
package org.apache.zeppelin.display;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
import com.google.gson.Gson;
import org.apache.zeppelin.common.JsonSerializable;
import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
/**
* AngularObject provides binding between back-end (interpreter) and front-end
* User provided object will automatically synchronized with front-end side.
@@ -150,7 +150,7 @@ public class AngularObject<T> implements JsonSerializable {
* fire updated() event for listener
* Note that it does not invoke watcher.watch()
*/
- public void emit(){
+ public void emit() {
if (listener != null) {
listener.updated(this);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 139edd1..dba9471 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter;
+import org.apache.thrift.TException;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.display.AngularObject;
@@ -24,7 +25,6 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -50,8 +51,6 @@ public abstract class BaseZeppelinContext {
protected GUI gui;
protected GUI noteGui;
- private static RemoteEventClientWrapper eventClient;
-
public BaseZeppelinContext(InterpreterHookRegistry hooks, int maxResult) {
this.hooks = hooks;
this.maxResult = maxResult;
@@ -69,6 +68,7 @@ public abstract class BaseZeppelinContext {
/**
* subclasses should implement this method to display specific data type
+ *
* @param obj
* @return
*/
@@ -76,7 +76,6 @@ public abstract class BaseZeppelinContext {
/**
* @deprecated use z.textbox instead
- *
*/
@Deprecated
@ZeppelinApi
@@ -147,7 +146,7 @@ public abstract class BaseZeppelinContext {
private Object select(String name, Object defaultValue, ParamOption[] paramOptions,
- boolean noteForm) {
+ boolean noteForm) {
if (noteForm) {
return noteGui.select(name, defaultValue, paramOptions);
} else {
@@ -164,7 +163,7 @@ public abstract class BaseZeppelinContext {
}
private Collection<Object> checkbox(String name, ParamOption[] options,
- boolean noteForm) {
+ boolean noteForm) {
List<Object> defaultValues = new LinkedList<>();
for (ParamOption option : options) {
defaultValues.add(option.getValue());
@@ -217,6 +216,7 @@ public abstract class BaseZeppelinContext {
/**
* display special types of objects for interpreter.
* Each interpreter can has its own supported classes.
+ *
* @param o object
*/
@ZeppelinApi
@@ -227,7 +227,8 @@ public abstract class BaseZeppelinContext {
/**
* display special types of objects for interpreter.
* Each interpreter can has its own supported classes.
- * @param o object
+ *
+ * @param o object
* @param maxResult maximum number of rows to display
*/
@@ -257,208 +258,127 @@ public abstract class BaseZeppelinContext {
/**
* Run paragraph by id
- * @param noteId
- * @param paragraphId
- */
- @ZeppelinApi
- public void run(String noteId, String paragraphId) {
- run(noteId, paragraphId, interpreterContext, true);
- }
-
- /**
- * Run paragraph by id
+ *
* @param paragraphId
*/
@ZeppelinApi
- public void run(String paragraphId) {
+ public void run(String paragraphId) throws IOException {
run(paragraphId, true);
}
/**
* Run paragraph by id
+ *
* @param paragraphId
* @param checkCurrentParagraph
*/
@ZeppelinApi
- public void run(String paragraphId, boolean checkCurrentParagraph) {
+ public void run(String paragraphId, boolean checkCurrentParagraph) throws IOException {
String noteId = interpreterContext.getNoteId();
run(noteId, paragraphId, interpreterContext, checkCurrentParagraph);
}
+ @ZeppelinApi
+ public void run(String noteId, String paragraphId)
+ throws IOException {
+ run(noteId, paragraphId, InterpreterContext.get(), true);
+ }
+
/**
* Run paragraph by id
+ *
* @param noteId
*/
@ZeppelinApi
- public void run(String noteId, String paragraphId, InterpreterContext context) {
+ public void run(String noteId, String paragraphId, InterpreterContext context)
+ throws IOException {
run(noteId, paragraphId, context, true);
}
/**
* Run paragraph by id
+ *
* @param noteId
* @param context
*/
@ZeppelinApi
public void run(String noteId, String paragraphId, InterpreterContext context,
- boolean checkCurrentParagraph) {
+ boolean checkCurrentParagraph) throws IOException {
+
if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) {
throw new RuntimeException("Can not run current Paragraph");
}
-
- List<InterpreterContextRunner> runners =
- getInterpreterContextRunner(noteId, paragraphId, context);
-
- if (runners.size() <= 0) {
- throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size());
- }
-
- for (InterpreterContextRunner r : runners) {
- r.run();
- }
-
+ List<String> paragraphIds = new ArrayList<>();
+ paragraphIds.add(paragraphId);
+ List<Integer> paragraphIndices = new ArrayList<>();
+ context.getIntpEventClient()
+ .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
}
- public void runNote(String noteId) {
+ public void runNote(String noteId) throws IOException {
runNote(noteId, interpreterContext);
}
- public void runNote(String noteId, InterpreterContext context) {
- String runningNoteId = context.getNoteId();
- String runningParagraphId = context.getParagraphId();
- List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
-
- if (runners.size() <= 0) {
- throw new RuntimeException("Note " + noteId + " not found " + runners.size());
- }
-
- for (InterpreterContextRunner r : runners) {
- if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
- continue;
- }
- LOGGER.debug("Run Paragraph: " + r.getParagraphId() + " of Note: " + r.getNoteId());
- r.run();
- }
- }
-
-
- /**
- * get Zeppelin Paragraph Runner from zeppelin server
- * @param noteId
- */
- @ZeppelinApi
- public List<InterpreterContextRunner> getInterpreterContextRunner(
- String noteId, InterpreterContext interpreterContext) {
- List<InterpreterContextRunner> runners = new LinkedList<>();
- RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
- if (remoteWorksController != null) {
- runners = remoteWorksController.getRemoteContextRunner(noteId);
- }
-
- return runners;
- }
-
- /**
- * get Zeppelin Paragraph Runner from zeppelin server
- * @param noteId
- * @param paragraphId
- */
- @ZeppelinApi
- public List<InterpreterContextRunner> getInterpreterContextRunner(
- String noteId, String paragraphId, InterpreterContext interpreterContext) {
- List<InterpreterContextRunner> runners = new LinkedList<>();
- RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
- if (remoteWorksController != null) {
- runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
- }
-
- return runners;
+ public void runNote(String noteId, InterpreterContext context) throws IOException {
+ List<String> paragraphIds = new ArrayList<>();
+ List<Integer> paragraphIndices = new ArrayList<>();
+ context.getIntpEventClient()
+ .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
}
/**
* Run paragraph at idx
+ *
* @param idx
*/
@ZeppelinApi
- public void run(int idx) {
+ public void run(int idx) throws IOException {
run(idx, true);
}
/**
- *
- * @param idx paragraph index
- * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
- * Set it to false only when you are sure you are not invoking this method to run current
- * paragraph. Otherwise you would run current paragraph in infinite loop.
+ * @param idx paragraph index
+ * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
+ * Set it to false only when you are sure you are not invoking this method to run current
+ * paragraph. Otherwise you would run current paragraph in infinite loop.
*/
- public void run(int idx, boolean checkCurrentParagraph) {
+ public void run(int idx, boolean checkCurrentParagraph) throws IOException {
String noteId = interpreterContext.getNoteId();
run(noteId, idx, interpreterContext, checkCurrentParagraph);
}
/**
* Run paragraph at index
+ *
* @param noteId
- * @param idx index starting from 0
+ * @param idx index starting from 0
* @param context interpreter context
*/
- public void run(String noteId, int idx, InterpreterContext context) {
+ public void run(String noteId, int idx, InterpreterContext context) throws IOException {
run(noteId, idx, context, true);
}
/**
- *
* @param noteId
- * @param idx paragraph index
- * @param context interpreter context
- * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
+ * @param idx paragraph index
+ * @param context interpreter context
+ * @param checkCurrentParagraph
+ * check whether you call this run method in the current paragraph.
* Set it to false only when you are sure you are not invoking this method to run current
* paragraph. Otherwise you would run current paragraph in infinite loop.
*/
public void run(String noteId, int idx, InterpreterContext context,
- boolean checkCurrentParagraph) {
- List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
- if (idx >= runners.size()) {
- throw new RuntimeException("Index out of bound");
- }
-
- InterpreterContextRunner runner = runners.get(idx);
- if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) {
- throw new RuntimeException("Can not run current Paragraph: " + runner.getParagraphId());
- }
+ boolean checkCurrentParagraph) throws IOException {
- runner.run();
+ List<String> paragraphIds = new ArrayList<>();
+ List<Integer> paragraphIndices = new ArrayList<>();
+ paragraphIndices.add(idx);
+ context.getIntpEventClient()
+ .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
}
@ZeppelinApi
- public void run(List<Object> paragraphIdOrIdx) {
- run(paragraphIdOrIdx, interpreterContext);
- }
-
- /**
- * Run paragraphs
- * @param paragraphIdOrIdx list of paragraph id or idx
- */
- @ZeppelinApi
- public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
- String noteId = context.getNoteId();
- for (Object idOrIdx : paragraphIdOrIdx) {
- if (idOrIdx instanceof String) {
- String paragraphId = (String) idOrIdx;
- run(noteId, paragraphId, context);
- } else if (idOrIdx instanceof Integer) {
- Integer idx = (Integer) idOrIdx;
- run(noteId, idx, context);
- } else {
- throw new RuntimeException("Paragraph " + idOrIdx + " not found");
- }
- }
- }
-
- @ZeppelinApi
- public void runAll() {
+ public void runAll() throws IOException {
runAll(interpreterContext);
}
@@ -466,22 +386,10 @@ public abstract class BaseZeppelinContext {
* Run all paragraphs. except this.
*/
@ZeppelinApi
- public void runAll(InterpreterContext context) {
+ public void runAll(InterpreterContext context) throws IOException {
runNote(context.getNoteId());
}
- @ZeppelinApi
- public List<String> listParagraphs() {
- List<String> paragraphs = new LinkedList<>();
-
- for (InterpreterContextRunner r : interpreterContext.getRunners()) {
- paragraphs.add(r.getParagraphId());
- }
-
- return paragraphs;
- }
-
-
private AngularObject getAngularObject(String name, InterpreterContext interpreterContext) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
@@ -501,6 +409,7 @@ public abstract class BaseZeppelinContext {
/**
* Get angular object. Look up notebook scope first and then global scope
+ *
* @param name variable name
* @return value
*/
@@ -516,6 +425,7 @@ public abstract class BaseZeppelinContext {
/**
* Get angular object. Look up global scope
+ *
* @param name variable name
* @return value
*/
@@ -533,52 +443,58 @@ public abstract class BaseZeppelinContext {
/**
* Create angular variable in notebook scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
+ *
* @param name name of the variable
- * @param o value
+ * @param o value
*/
@ZeppelinApi
- public void angularBind(String name, Object o) {
+ public void angularBind(String name, Object o) throws TException {
angularBind(name, o, interpreterContext.getNoteId());
}
/**
* Create angular variable in global scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
+ *
* @param name name of the variable
- * @param o value
+ * @param o value
*/
@Deprecated
- public void angularBindGlobal(String name, Object o) {
+ public void angularBindGlobal(String name, Object o) throws TException {
angularBind(name, o, (String) null);
}
/**
* Create angular variable in local scope and bind with front end Angular display system.
* If variable exists, value will be overwritten and watcher will be added.
- * @param name name of variable
- * @param o value
+ *
+ * @param name name of variable
+ * @param o value
* @param watcher watcher of the variable
*/
@ZeppelinApi
- public void angularBind(String name, Object o, AngularObjectWatcher watcher) {
+ public void angularBind(String name, Object o, AngularObjectWatcher watcher) throws TException {
angularBind(name, o, interpreterContext.getNoteId(), watcher);
}
/**
* Create angular variable in global scope and bind with front end Angular display system.
* If variable exists, value will be overwritten and watcher will be added.
- * @param name name of variable
- * @param o value
+ *
+ * @param name name of variable
+ * @param o value
* @param watcher watcher of the variable
*/
@Deprecated
- public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher) {
+ public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher)
+ throws TException {
angularBind(name, o, null, watcher);
}
/**
* Add watcher into angular variable (local scope)
- * @param name name of the variable
+ *
+ * @param name name of the variable
* @param watcher watcher
*/
@ZeppelinApi
@@ -588,7 +504,8 @@ public abstract class BaseZeppelinContext {
/**
* Add watcher into angular variable (global scope)
- * @param name name of the variable
+ *
+ * @param name name of the variable
* @param watcher watcher
*/
@Deprecated
@@ -597,9 +514,9 @@ public abstract class BaseZeppelinContext {
}
-
/**
* Remove watcher from angular variable (local)
+ *
* @param name
* @param watcher
*/
@@ -610,6 +527,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove watcher from angular variable (global)
+ *
* @param name
* @param watcher
*/
@@ -621,6 +539,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove all watchers for the angular variable (local)
+ *
* @param name
*/
@ZeppelinApi
@@ -630,6 +549,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove all watchers for the angular variable (global)
+ *
* @param name
*/
@Deprecated
@@ -639,30 +559,33 @@ public abstract class BaseZeppelinContext {
/**
* Remove angular variable and all the watchers.
+ *
* @param name
*/
@ZeppelinApi
- public void angularUnbind(String name) {
+ public void angularUnbind(String name) throws TException {
String noteId = interpreterContext.getNoteId();
angularUnbind(name, noteId);
}
/**
* Remove angular variable and all the watchers.
+ *
* @param name
*/
@Deprecated
- public void angularUnbindGlobal(String name) {
+ public void angularUnbindGlobal(String name) throws TException {
angularUnbind(name, null);
}
/**
* Create angular variable in notebook scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
+ *
* @param name name of the variable
- * @param o value
+ * @param o value
*/
- private void angularBind(String name, Object o, String noteId) {
+ public void angularBind(String name, Object o, String noteId) throws TException {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name, noteId, null) == null) {
@@ -676,11 +599,13 @@ public abstract class BaseZeppelinContext {
* Create angular variable in notebook scope and bind with front end Angular display
* system.
* If variable exists, value will be overwritten and watcher will be added.
- * @param name name of variable
- * @param o value
+ *
+ * @param name name of variable
+ * @param o value
* @param watcher watcher of the variable
*/
- private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher) {
+ private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher)
+ throws TException {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name, noteId, null) == null) {
@@ -693,7 +618,8 @@ public abstract class BaseZeppelinContext {
/**
* Add watcher into angular binding variable
- * @param name name of the variable
+ *
+ * @param name name of the variable
* @param watcher watcher
*/
public void angularWatch(String name, String noteId, AngularObjectWatcher watcher) {
@@ -706,6 +632,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove watcher
+ *
* @param name
* @param watcher
*/
@@ -718,6 +645,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove all watchers for the angular variable
+ *
* @param name
*/
private void angularUnwatch(String name, String noteId) {
@@ -729,15 +657,17 @@ public abstract class BaseZeppelinContext {
/**
* Remove angular variable and all the watchers.
+ *
* @param name
*/
- private void angularUnbind(String name, String noteId) {
+ private void angularUnbind(String name, String noteId) throws TException {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
registry.remove(name, noteId, null);
}
/**
* Get the interpreter class name from name entered in paragraph
+ *
* @param replName if replName is a valid className, return that instead.
*/
private String getClassNameFromReplName(String replName) {
@@ -750,8 +680,9 @@ public abstract class BaseZeppelinContext {
/**
* General function to register hook event
- * @param event The type of event to hook to (pre_exec, post_exec)
- * @param cmd The code to be executed by the interpreter on given event
+ *
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param cmd The code to be executed by the interpreter on given event
* @param replName Name of the interpreter
*/
@Experimental
@@ -762,8 +693,9 @@ public abstract class BaseZeppelinContext {
/**
* registerHook() wrapper for current repl
+ *
* @param event The type of event to hook to (pre_exec, post_exec)
- * @param cmd The code to be executed by the interpreter on given event
+ * @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) throws InvalidHookException {
@@ -772,7 +704,6 @@ public abstract class BaseZeppelinContext {
}
/**
- *
* @param event
* @param cmd
* @param noteId
@@ -795,7 +726,7 @@ public abstract class BaseZeppelinContext {
/**
* Unbind code from given hook event and given repl
*
- * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
@@ -806,6 +737,7 @@ public abstract class BaseZeppelinContext {
/**
* unregisterHook() wrapper for current repl
+ *
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
@@ -816,8 +748,8 @@ public abstract class BaseZeppelinContext {
/**
* Unbind code from given hook event and given note
*
- * @param noteId The id of note
- * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param noteId The id of note
+ * @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterNoteHook(String noteId, String event) {
@@ -828,8 +760,9 @@ public abstract class BaseZeppelinContext {
/**
* Unbind code from given hook event, given note and given repl
- * @param noteId The id of note
- * @param event The type of event to hook to (pre_exec, post_exec)
+ *
+ * @param noteId The id of note
+ * @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
@@ -841,6 +774,7 @@ public abstract class BaseZeppelinContext {
/**
* Add object into resource pool
+ *
* @param name
* @param value
*/
@@ -853,6 +787,7 @@ public abstract class BaseZeppelinContext {
/**
* Get object from resource pool
* Search local process first and then the other processes
+ *
* @param name
* @return null if resource not found
*/
@@ -869,6 +804,7 @@ public abstract class BaseZeppelinContext {
/**
* Remove object from resourcePool
+ *
* @param name
*/
@ZeppelinApi
@@ -879,6 +815,7 @@ public abstract class BaseZeppelinContext {
/**
* Check if resource pool has the object
+ *
* @param name
* @return
*/
@@ -897,22 +834,4 @@ public abstract class BaseZeppelinContext {
ResourcePool resourcePool = interpreterContext.getResourcePool();
return resourcePool.getAll();
}
-
- /**
- * Get the event client
- */
- @ZeppelinApi
- public static RemoteEventClientWrapper getEventClient() {
- return eventClient;
- }
-
- /**
- * Set event client
- */
- @ZeppelinApi
- public void setEventClient(RemoteEventClientWrapper eventClient) {
- if (BaseZeppelinContext.eventClient == null) {
- BaseZeppelinContext.eventClient = eventClient;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index e4518a4..ecff46b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,18 +17,14 @@
package org.apache.zeppelin.interpreter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* Interpreter context
@@ -61,17 +57,19 @@ public class InterpreterContext {
private GUI noteGui = new GUI();
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
- private List<InterpreterContextRunner> runners = new ArrayList<>();
private String interpreterClassName;
- private RemoteEventClientWrapper client;
- private RemoteWorksController remoteWorksController;
private Map<String, Integer> progressMap;
+ private RemoteInterpreterEventClient intpEventClient;
/**
* Builder class for InterpreterContext
*/
public static class Builder {
- private InterpreterContext context = new InterpreterContext();
+ private InterpreterContext context;
+
+ public Builder() {
+ context = new InterpreterContext();
+ }
public Builder setNoteId(String noteId) {
context.noteId = noteId;
@@ -83,13 +81,18 @@ public class InterpreterContext {
return this;
}
- public Builder setEventClient(RemoteEventClientWrapper client) {
- context.client = client;
+ public Builder setInterpreterClassName(String intpClassName) {
+ context.interpreterClassName = intpClassName;
return this;
}
- public Builder setInterpreterClassName(String intpClassName) {
- context.interpreterClassName = intpClassName;
+ public Builder setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
+ context.angularObjectRegistry = angularObjectRegistry;
+ return this;
+ }
+
+ public Builder setResourcePool(ResourcePool resourcePool) {
+ context.resourcePool = resourcePool;
return this;
}
@@ -98,7 +101,54 @@ public class InterpreterContext {
return this;
}
+ public Builder setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ context.authenticationInfo = authenticationInfo;
+ return this;
+ }
+
+ public Builder setConfig(Map<String, Object> config) {
+ context.config = config;
+ return this;
+ }
+
+ public Builder setGUI(GUI gui) {
+ context.gui = gui;
+ return this;
+ }
+
+ public Builder setNoteGUI(GUI noteGUI) {
+ context.noteGui = noteGUI;
+ return this;
+ }
+
+ public Builder setInterpreterOut(InterpreterOutput out) {
+ context.out = out;
+ return this;
+ }
+
+ public Builder setIntpEventClient(RemoteInterpreterEventClient intpEventClient) {
+ context.intpEventClient = intpEventClient;
+ return this;
+ }
+
+ public Builder setProgressMap(Map<String, Integer> progressMap) {
+ context.progressMap = progressMap;
+ return this;
+ }
+
+ public Builder setParagraphText(String paragraphText) {
+ context.paragraphText = paragraphText;
+ return this;
+ }
+
+ public Builder setParagraphTitle(String paragraphTitle) {
+ context.paragraphTitle = paragraphTitle;
+ return this;
+ }
+
+
public InterpreterContext build() {
+ InterpreterContext.set(context);
return context;
}
}
@@ -111,79 +161,6 @@ public class InterpreterContext {
}
- // visible for testing
- public InterpreterContext(String noteId,
- String paragraphId,
- String replName,
- String paragraphTitle,
- String paragraphText,
- AuthenticationInfo authenticationInfo,
- Map<String, Object> config,
- GUI gui,
- GUI noteGui,
- AngularObjectRegistry angularObjectRegistry,
- ResourcePool resourcePool,
- List<InterpreterContextRunner> runners,
- InterpreterOutput out
- ) {
- this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
- config, gui, noteGui, angularObjectRegistry, resourcePool, runners, out, null, null);
- }
-
- public InterpreterContext(String noteId,
- String paragraphId,
- String replName,
- String paragraphTitle,
- String paragraphText,
- AuthenticationInfo authenticationInfo,
- Map<String, Object> config,
- GUI gui,
- GUI noteGui,
- AngularObjectRegistry angularObjectRegistry,
- ResourcePool resourcePool,
- List<InterpreterContextRunner> runners,
- InterpreterOutput out,
- RemoteWorksController remoteWorksController,
- Map<String, Integer> progressMap
- ) {
- this.noteId = noteId;
- this.paragraphId = paragraphId;
- this.replName = replName;
- this.paragraphTitle = paragraphTitle;
- this.paragraphText = paragraphText;
- this.authenticationInfo = authenticationInfo;
- this.config = config;
- this.gui = gui;
- this.noteGui = noteGui;
- this.angularObjectRegistry = angularObjectRegistry;
- this.resourcePool = resourcePool;
- this.runners = runners;
- this.out = out;
- this.remoteWorksController = remoteWorksController;
- this.progressMap = progressMap;
- }
-
- public InterpreterContext(String noteId,
- String paragraphId,
- String replName,
- String paragraphTitle,
- String paragraphText,
- AuthenticationInfo authenticationInfo,
- Map<String, Object> config,
- GUI gui,
- GUI noteGui,
- AngularObjectRegistry angularObjectRegistry,
- ResourcePool resourcePool,
- List<InterpreterContextRunner> contextRunners,
- InterpreterOutput output,
- RemoteWorksController remoteWorksController,
- RemoteInterpreterEventClient eventClient,
- Map<String, Integer> progressMap) {
- this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
- config, gui, noteGui, angularObjectRegistry, resourcePool, contextRunners, output,
- remoteWorksController, progressMap);
- this.client = new RemoteEventClient(eventClient);
- }
public String getNoteId() {
return noteId;
@@ -229,10 +206,6 @@ public class InterpreterContext {
return resourcePool;
}
- public List<InterpreterContextRunner> getRunners() {
- return runners;
- }
-
public String getInterpreterClassName() {
return interpreterClassName;
}
@@ -241,20 +214,12 @@ public class InterpreterContext {
this.interpreterClassName = className;
}
- public RemoteEventClientWrapper getClient() {
- return client;
- }
-
- public void setClient(RemoteEventClientWrapper client) {
- this.client = client;
- }
-
- public RemoteWorksController getRemoteWorksController() {
- return remoteWorksController;
+ public RemoteInterpreterEventClient getIntpEventClient() {
+ return intpEventClient;
}
- public void setRemoteWorksController(RemoteWorksController remoteWorksController) {
- this.remoteWorksController = remoteWorksController;
+ public void setIntpEventClient(RemoteInterpreterEventClient intpEventClient) {
+ this.intpEventClient = intpEventClient;
}
public InterpreterOutput out() {