You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/15 05:27:43 UTC
zeppelin git commit: ZEPPELIN-3330. Add more test for
RemoteInterpreterServer
Repository: zeppelin
Updated Branches:
refs/heads/master 6001b98de -> 5499fc4e9
ZEPPELIN-3330. Add more test for RemoteInterpreterServer
### What is this PR for?
This PR add more test for RemoteInterpreterServer for the purpose of further changes on RemoveInterpreterServer. Besides, it did some code refactoring here.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3330
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2869 from zjffdu/ZEPPELIN-3330 and squashes the following commits:
8e8c219 [Jeff Zhang] ZEPPELIN-3330. Add more test for RemoteInterpreterServer
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5499fc4e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5499fc4e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5499fc4e
Branch: refs/heads/master
Commit: 5499fc4e90b44269bd5e4c4a94f3dd95b6c6fedc
Parents: 6001b98
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Mar 14 16:05:50 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Mar 15 13:23:23 2018 +0800
----------------------------------------------------------------------
.../interpreter/BaseZeppelinContext.java | 6 +-
.../interpreter/InterpreterContext.java | 10 +-
.../remote/RemoteInterpreterServer.java | 29 +-
.../remote/RemoteInterpreterServerTest.java | 284 ++++++++++++++-----
4 files changed, 234 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index e38a29f..2e9a9de 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -772,7 +772,7 @@ public abstract class BaseZeppelinContext {
*/
@Experimental
public void registerHook(String event, String cmd) {
- String className = interpreterContext.getClassName();
+ String className = interpreterContext.getInterpreterClassName();
registerHook(event, cmd, className);
}
@@ -794,7 +794,7 @@ public abstract class BaseZeppelinContext {
*/
@Experimental
public String getHook(String event) {
- String className = interpreterContext.getClassName();
+ String className = interpreterContext.getInterpreterClassName();
return getHook(event, className);
}
@@ -816,7 +816,7 @@ public abstract class BaseZeppelinContext {
*/
@Experimental
public void unregisterHook(String event) {
- String className = interpreterContext.getClassName();
+ String className = interpreterContext.getInterpreterClassName();
unregisterHook(event, className);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 8fa0904..6157d69 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -62,7 +62,7 @@ public class InterpreterContext {
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners = new ArrayList<>();
- private String className;
+ private String interpreterClassName;
private RemoteEventClientWrapper client;
private RemoteWorksController remoteWorksController;
private Map<String, Integer> progressMap;
@@ -214,12 +214,12 @@ public class InterpreterContext {
return runners;
}
- public String getClassName() {
- return className;
+ public String getInterpreterClassName() {
+ return interpreterClassName;
}
- public void setClassName(String className) {
- this.className = className;
+ public void setInterpreterClassName(String className) {
+ this.interpreterClassName = className;
}
public RemoteEventClientWrapper getClient() {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 766ed16..88ac59e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -199,7 +199,6 @@ public class RemoteInterpreterServer extends Thread
}
}).start();
}
- logger.info("Starting remote interpreter server on port {}", port);
server.serve();
}
@@ -420,21 +419,21 @@ public class RemoteInterpreterServer extends Thread
@Override
- public RemoteInterpreterResult interpret(String noteId, String className, String st,
+ public RemoteInterpreterResult interpret(String sessionId, String className, String st,
RemoteInterpreterContext interpreterContext)
throws TException {
if (logger.isDebugEnabled()) {
logger.debug("st:\n{}", st);
}
- Interpreter intp = getInterpreter(noteId, className);
+ Interpreter intp = getInterpreter(sessionId, className);
InterpreterContext context = convert(interpreterContext);
- context.setClassName(intp.getClassName());
+ context.setInterpreterClassName(intp.getClassName());
Scheduler scheduler = intp.getScheduler();
InterpretJobListener jobListener = new InterpretJobListener();
InterpretJob job = new InterpretJob(
interpreterContext.getParagraphId(),
- "remoteInterpretJob_" + System.currentTimeMillis(),
+ "RemoteInterpretJob_" + System.currentTimeMillis(),
jobListener,
JobProgressPoller.DEFAULT_INTERVAL_MSEC,
intp,
@@ -638,8 +637,7 @@ public class RemoteInterpreterServer extends Thread
// put result into resource pool
if (resultMessages.size() > 0) {
int lastMessageIndex = resultMessages.size() - 1;
- if (resultMessages.get(lastMessageIndex).getType() ==
- InterpreterResult.Type.TABLE) {
+ if (resultMessages.get(lastMessageIndex).getType() == InterpreterResult.Type.TABLE) {
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
@@ -667,10 +665,11 @@ public class RemoteInterpreterServer extends Thread
@Override
- public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext)
- throws TException {
+ public void cancel(String sessionId,
+ String className,
+ RemoteInterpreterContext interpreterContext) throws TException {
logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
- Interpreter intp = getInterpreter(noteId, className);
+ Interpreter intp = getInterpreter(sessionId, className);
String jobId = interpreterContext.getParagraphId();
Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
@@ -742,8 +741,10 @@ public class RemoteInterpreterServer extends Thread
new TypeToken<List<RemoteInterpreterContextRunner>>() {
}.getType());
- for (InterpreterContextRunner r : runners) {
- contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+ if (runners != null) {
+ for (InterpreterContextRunner r : runners) {
+ contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+ }
}
return new InterpreterContext(
@@ -790,7 +791,7 @@ public class RemoteInterpreterServer extends Thread
String output;
try {
output = new String(out.toByteArray());
- logger.debug("Output Update: {}", output);
+ logger.debug("Output Update for index {}: {}", index, output);
eventClient.onInterpreterOutputUpdate(
noteId, paragraphId, index, out.getType(), output);
} catch (IOException e) {
@@ -919,7 +920,7 @@ public class RemoteInterpreterServer extends Thread
if (interpreters == null) {
return Status.UNKNOWN.name();
}
-
+ //TODO(zjffdu) ineffient for loop interpreter and its jobs
for (Interpreter intp : interpreters) {
for (Job job : intp.getScheduler().getJobsRunning()) {
if (jobId.equals(job.getId())) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 79a2331..2ae1362 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -18,128 +18,266 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.junit.Test;
import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class RemoteInterpreterServerTest {
- @Before
- public void setUp() throws Exception {
- }
- @After
- public void tearDown() throws Exception {
+ @Test
+ public void testStartStop() throws InterruptedException, IOException, TException {
+ RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+
+ startRemoteInterpreterServer(server, 10 * 1000);
+ stopRemoteInterpreterServer(server, 10 * 10000);
}
@Test
- public void testStartStop() throws InterruptedException, IOException, TException {
+ public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
- assertEquals(false, server.isRunning());
+ startRemoteInterpreterServer(server, 10 * 1000);
+ //just send an event on the client queue
+ server.eventClient.onAppStatusUpdate("", "", "", "");
+ stopRemoteInterpreterServer(server, 10 * 10000);
+ }
+
+ private void startRemoteInterpreterServer(RemoteInterpreterServer server, int timeout)
+ throws InterruptedException {
+ assertEquals(false, server.isRunning());
server.start();
long startTime = System.currentTimeMillis();
- boolean running = false;
-
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ while (System.currentTimeMillis() - startTime < timeout) {
if (server.isRunning()) {
- running = true;
break;
- } else {
- Thread.sleep(200);
}
+ Thread.sleep(200);
}
-
- assertEquals(true, running);
+ assertEquals(true, server.isRunning());
assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
server.getPort()));
+ }
+ private void stopRemoteInterpreterServer(RemoteInterpreterServer server, int timeout)
+ throws TException, InterruptedException {
+ assertEquals(true, server.isRunning());
server.shutdown();
-
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- Thread.sleep(200);
- } else {
- running = false;
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < timeout) {
+ if (!server.isRunning()) {
break;
}
+ Thread.sleep(200);
}
- assertEquals(false, running);
+ assertEquals(false, server.isRunning());
+ assertEquals(false, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
+ server.getPort()));
}
- class ShutdownRun implements Runnable {
- private RemoteInterpreterServer serv = null;
+ @Test
+ public void testInterpreter() throws IOException, TException, InterruptedException {
+ final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("property_1", "value_1");
+ intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");
+
+ // create Test1Interpreter in session_1
+ server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+ intpProperties, "user_1");
+ Test1Interpreter interpreter1 = (Test1Interpreter)
+ ((LazyOpenInterpreter) server.interpreterGroup.get("session_1").get(0))
+ .getInnerInterpreter();
+ assertEquals(1, server.interpreterGroup.getSessionNum());
+ assertEquals(1, server.interpreterGroup.get("session_1").size());
+ assertEquals(2, interpreter1.getProperties().size());
+ assertEquals("value_1", interpreter1.getProperty("property_1"));
+
+ // create Test2Interpreter in session_1
+ server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+ intpProperties, "user_1");
+ assertEquals(2, server.interpreterGroup.get("session_1").size());
+
+ // create Test1Interpreter in session_2
+ server.createInterpreter("group_1", "session_2", Test1Interpreter.class.getName(),
+ intpProperties, "user_1");
+ assertEquals(2, server.interpreterGroup.getSessionNum());
+ assertEquals(2, server.interpreterGroup.get("session_1").size());
+ assertEquals(1, server.interpreterGroup.get("session_2").size());
+
+ final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
+ intpContext.setNoteId("note_1");
+ intpContext.setParagraphId("paragraph_1");
+ intpContext.setGui("{}");
+ intpContext.setNoteGui("{}");
+
+ // single output of SUCCESS
+ RemoteInterpreterResult result = server.interpret("session_1", Test1Interpreter.class.getName(),
+ "SINGLE_OUTPUT_SUCCESS", intpContext);
+ assertEquals("SUCCESS", result.code);
+ assertEquals(1, result.getMsg().size());
+ assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(0).getData());
+
+ // combo output of SUCCESS
+ result = server.interpret("session_1", Test1Interpreter.class.getName(), "COMBO_OUTPUT_SUCCESS",
+ intpContext);
+ assertEquals("SUCCESS", result.code);
+ assertEquals(2, result.getMsg().size());
+ assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
+ assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());
+
+ // single output of ERROR
+ result = server.interpret("session_1", Test1Interpreter.class.getName(), "SINGLE_OUTPUT_ERROR",
+ intpContext);
+ assertEquals("ERROR", result.code);
+ assertEquals(1, result.getMsg().size());
+ assertEquals("SINGLE_OUTPUT_ERROR", result.getMsg().get(0).getData());
+
+ // getFormType
+ String formType = server.getFormType("session_1", Test1Interpreter.class.getName());
+ assertEquals("NATIVE", formType);
+
+ // cancel
+ Thread sleepThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ server.interpret("session_1", Test1Interpreter.class.getName(), "SLEEP", intpContext);
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ sleepThread.start();
+
+ Thread.sleep(1000);
+ assertFalse(interpreter1.cancelled.get());
+ server.cancel("session_1", Test1Interpreter.class.getName(), intpContext);
+ assertTrue(interpreter1.cancelled.get());
+
+ // getProgress
+ assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(),
+ intpContext));
+
+ // close
+ server.close("session_1", Test1Interpreter.class.getName());
+ assertTrue(interpreter1.closed.get());
+ }
+
+ public static class Test1Interpreter extends Interpreter {
+
+ AtomicBoolean cancelled = new AtomicBoolean();
+ AtomicBoolean closed = new AtomicBoolean();
+
+ public Test1Interpreter(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ public void open() {
- ShutdownRun(RemoteInterpreterServer serv) {
- this.serv = serv;
}
@Override
- public void run() {
- try {
- serv.shutdown();
- } catch (Exception ex) {
- // ignore exception
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ if (st.equals("SINGLE_OUTPUT_SUCCESS")) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS");
+ } else if (st.equals("SINGLE_OUTPUT_ERROR")) {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, "SINGLE_OUTPUT_ERROR");
+ } else if (st.equals("COMBO_OUTPUT_SUCCESS")) {
+ try {
+ context.out.write("INTERPRETER_OUT");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS");
+ } else if (st.equals("SLEEP")) {
+ try {
+ Thread.sleep(3 * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SLEEP_SUCCESS");
}
+ return null;
}
- }
- ;
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ cancelled.set(true);
+ }
- @Test
- public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
- RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
- assertEquals(false, server.isRunning());
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.NATIVE;
+ }
- server.start();
- long startTime = System.currentTimeMillis();
- boolean running = false;
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return 10;
+ }
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- running = true;
- break;
- } else {
- Thread.sleep(200);
- }
+ @Override
+ public void close() {
+ closed.set(true);
}
- assertEquals(true, running);
- assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
- server.getPort()));
+ }
- //just send an event on the client queue
- server.eventClient.onAppStatusUpdate("", "", "", "");
+ public static class Test2Interpreter extends Interpreter {
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- Runnable task = new ShutdownRun(server);
+ public Test2Interpreter(Properties properties) {
+ super(properties);
+ }
- executor.schedule(task, 0, TimeUnit.MILLISECONDS);
+ @Override
+ public void open() {
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- Thread.sleep(200);
- } else {
- running = false;
- break;
- }
}
- executor.shutdown();
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return null;
+ }
- //cleanup environment for next tests
- server.shutdown();
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
- assertEquals(false, running);
- }
+ }
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ }
}