You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/11/24 18:53:50 UTC
incubator-zeppelin git commit: ZEPPELIN-424 Cancel paragraph in
pending status
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master fa5057cfd -> 0665cef05
ZEPPELIN-424 Cancel paragraph in pending status
https://issues.apache.org/jira/browse/ZEPPELIN-424
Cancel paragraph in pending status.
By removing job from waiting queue and set status ABORT.
Author: Lee moon soo <mo...@apache.org>
Closes #454 from Leemoonsoo/ZEPPELIN-424 and squashes the following commits:
b005129 [Lee moon soo] Keep previous result on ABORT in PENDING status
9d5056a [Lee moon soo] Allow job abort in PENDING status
ad26ac4 [Lee moon soo] Add unittest for abort in PENDING status
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0665cef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0665cef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0665cef0
Branch: refs/heads/master
Commit: 0665cef059b6444381eefc6a5870ca510607ce11
Parents: fa5057c
Author: Lee moon soo <mo...@apache.org>
Authored: Sun Nov 22 09:20:47 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Nov 25 02:54:29 2015 +0900
----------------------------------------------------------------------
.../zeppelin/interpreter/Interpreter.java | 5 +-
.../zeppelin/interpreter/InterpreterResult.java | 18 +--
.../interpreter/remote/RemoteInterpreter.java | 10 +-
.../remote/RemoteInterpreterServer.java | 19 ++-
.../zeppelin/scheduler/FIFOScheduler.java | 38 ++++--
.../zeppelin/scheduler/ParallelScheduler.java | 35 +++--
.../zeppelin/scheduler/RemoteScheduler.java | 18 ++-
.../apache/zeppelin/scheduler/Scheduler.java | 7 +-
.../zeppelin/scheduler/FIFOSchedulerTest.java | 25 +++-
.../zeppelin/scheduler/RemoteSchedulerTest.java | 134 ++++++++++++++++++-
.../org/apache/zeppelin/notebook/Paragraph.java | 13 +-
11 files changed, 271 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index 3f3503c..d9bb0bf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -121,7 +121,10 @@ public abstract class Interpreter {
* Called when interpreter is no longer used.
*/
public void destroy() {
- getScheduler().stop();
+ Scheduler scheduler = getScheduler();
+ if (scheduler != null) {
+ scheduler.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 20317eb..593cfc7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -23,29 +23,21 @@ import java.util.*;
/**
* Interpreter result template.
- *
- * @author Leemoonsoo
- *
*/
public class InterpreterResult implements Serializable {
/**
* Type of result after code execution.
- *
- * @author Leemoonsoo
- *
*/
public static enum Code {
SUCCESS,
INCOMPLETE,
- ERROR
+ ERROR,
+ KEEP_PREVIOUS_RESULT
}
/**
* Type of Data.
- *
- * @author Leemoonsoo
- *
*/
public static enum Type {
TEXT,
@@ -99,7 +91,7 @@ public class InterpreterResult implements Serializable {
int magicLength = lastType.getValue().name().length() + 1;
// 1 for the last \n or space after magic
int subStringPos = magicLength + lastType.getKey() + 1;
- return msg.substring(subStringPos);
+ return msg.substring(subStringPos);
}
}
@@ -116,7 +108,7 @@ public class InterpreterResult implements Serializable {
return lastType.getValue();
}
}
-
+
private int getIndexOfType(String msg, Type t) {
if (msg == null) {
return 0;
@@ -124,7 +116,7 @@ public class InterpreterResult implements Serializable {
String typeString = "%" + t.name().toLowerCase();
return StringUtils.indexOf(msg, typeString );
}
-
+
private TreeMap<Integer, Type> buildIndexMap(String msg) {
int lastIndexOftypes = 0;
TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<Integer, Type>();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9d01561..ef1f115 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -329,9 +329,13 @@ public class RemoteInterpreter extends Interpreter {
public Scheduler getScheduler() {
int maxConcurrency = 10;
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- return SchedulerFactory.singleton().createOrGetRemoteScheduler(
- "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(),
- maxConcurrency);
+ if (interpreterProcess == null) {
+ return null;
+ } else {
+ return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+ "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(),
+ maxConcurrency);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 7405a66..d6768c9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -37,7 +37,6 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -62,7 +61,8 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
- *
+ * Entry point for Interpreter process.
+ * Accepting thrift connections from ZeppelinServer.
*/
public class RemoteInterpreterServer
extends Thread
@@ -233,6 +233,11 @@ public class RemoteInterpreterServer
result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException()));
} else {
result = (InterpreterResult) job.getReturn();
+
+ // in case of job abort in PENDING status, result can be null
+ if (result == null) {
+ result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT);
+ }
}
return convert(result,
context.getConfig(),
@@ -303,8 +308,16 @@ public class RemoteInterpreterServer
@Override
public void cancel(String className, RemoteInterpreterContext interpreterContext)
throws TException {
+ logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
Interpreter intp = getInterpreter(className);
- intp.cancel(convert(interpreterContext));
+ String jobId = interpreterContext.getParagraphId();
+ Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
+
+ if (job != null) {
+ job.setStatus(Status.ABORT);
+ } else {
+ intp.cancel(convert(interpreterContext));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index e7f950a..11b5618 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.scheduler;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.Job.Status;
/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * FIFOScheduler runs submitted job sequentially
*/
public class FIFOScheduler implements Scheduler {
List<Job> queue = new LinkedList<Job>();
@@ -83,20 +81,38 @@ public class FIFOScheduler implements Scheduler {
}
}
+
+ @Override
+ public Job removeFromWaitingQueue(String jobId) {
+ synchronized (queue) {
+ Iterator<Job> it = queue.iterator();
+ while (it.hasNext()) {
+ Job job = it.next();
+ if (job.getId().equals(jobId)) {
+ it.remove();
+ return job;
+ }
+ }
+ }
+ return null;
+ }
+
@Override
public void run() {
synchronized (queue) {
while (terminate == false) {
- if (runningJob != null || queue.isEmpty() == true) {
- try {
- queue.wait(500);
- } catch (InterruptedException e) {
+ synchronized (queue) {
+ if (runningJob != null || queue.isEmpty() == true) {
+ try {
+ queue.wait(500);
+ } catch (InterruptedException e) {
+ }
+ continue;
}
- continue;
- }
- runningJob = queue.remove(0);
+ runningJob = queue.remove(0);
+ }
final Scheduler scheduler = this;
this.executor.execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index c8e8e04..8507861 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.scheduler;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.Job.Status;
/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * Parallel scheduler runs submitted job concurrently.
*/
public class ParallelScheduler implements Scheduler {
List<Job> queue = new LinkedList<Job>();
@@ -64,6 +62,21 @@ public class ParallelScheduler implements Scheduler {
}
@Override
+ public Job removeFromWaitingQueue(String jobId) {
+ synchronized (queue) {
+ Iterator<Job> it = queue.iterator();
+ while (it.hasNext()) {
+ Job job = it.next();
+ if (job.getId().equals(jobId)) {
+ it.remove();
+ return job;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Collection<Job> getJobsRunning() {
List<Job> ret = new LinkedList<Job>();
synchronized (queue) {
@@ -87,9 +100,9 @@ public class ParallelScheduler implements Scheduler {
@Override
public void run() {
-
- synchronized (queue) {
- while (terminate == false) {
+ while (terminate == false) {
+ Job job = null;
+ synchronized (queue) {
if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
try {
queue.wait(500);
@@ -98,14 +111,12 @@ public class ParallelScheduler implements Scheduler {
continue;
}
- Job job = queue.remove(0);
+ job = queue.remove(0);
running.add(job);
- Scheduler scheduler = this;
-
- executor.execute(new JobRunner(scheduler, job));
}
+ Scheduler scheduler = this;
-
+ executor.execute(new JobRunner(scheduler, job));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index ec5fcd4..51dab12 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.scheduler;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -32,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
*/
public class RemoteScheduler implements Scheduler {
Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
@@ -108,6 +109,21 @@ public class RemoteScheduler implements Scheduler {
}
@Override
+ public Job removeFromWaitingQueue(String jobId) {
+ synchronized (queue) {
+ Iterator<Job> it = queue.iterator();
+ while (it.hasNext()) {
+ Job job = it.next();
+ if (job.getId().equals(jobId)) {
+ it.remove();
+ return job;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Collection<Job> getJobsRunning() {
List<Job> ret = new LinkedList<Job>();
synchronized (queue) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
index a886c22..90d4397 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -20,10 +20,7 @@ package org.apache.zeppelin.scheduler;
import java.util.Collection;
/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * Interface for scheduler
*/
public interface Scheduler extends Runnable {
public String getName();
@@ -34,5 +31,7 @@ public interface Scheduler extends Runnable {
public void submit(Job job);
+ public Job removeFromWaitingQueue(String jobId);
+
public void stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
index 3d8495c..7288b67 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
@@ -88,7 +88,30 @@ public class FIFOSchedulerTest extends TestCase {
assertTrue((500 > (Long)job1.getReturn()));
assertEquals(null, job2.getReturn());
+ }
+ public void testRemoveFromWaitingQueue() throws InterruptedException{
+ Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
+ assertEquals(0, s.getJobsRunning().size());
+ assertEquals(0, s.getJobsWaiting().size());
- }
+ Job job1 = new SleepingJob("job1", null, 500);
+ Job job2 = new SleepingJob("job2", null, 500);
+
+ s.submit(job1);
+ s.submit(job2);
+
+ Thread.sleep(200);
+
+ job1.abort();
+ job2.abort();
+
+ Thread.sleep(200);
+
+ assertEquals(Status.ABORT, job1.getStatus());
+ assertEquals(Status.ABORT, job2.getStatus());
+
+ assertTrue((500 > (Long)job1.getReturn()));
+ assertEquals(null, job2.getReturn());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 08fe190..d17df4f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -18,6 +18,8 @@
package org.apache.zeppelin.scheduler;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -33,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.scheduler.Job.Status;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -127,7 +130,7 @@ public class RemoteSchedulerTest {
Thread.sleep(TICK_WAIT);
cycles++;
}
-
+
assertTrue(job.isTerminated());
assertEquals(0, scheduler.getJobsWaiting().size());
assertEquals(0, scheduler.getJobsRunning().size());
@@ -136,4 +139,133 @@ public class RemoteSchedulerTest {
schedulerSvc.removeScheduler("test");
}
+ @Test
+ public void testAbortOnPending() throws Exception {
+ Properties p = new Properties();
+ final InterpreterGroup intpGroup = new InterpreterGroup();
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ final RemoteInterpreter intpA = new RemoteInterpreter(
+ p,
+ MockInterpreterA.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000
+ );
+
+ intpGroup.add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
+ intpA.getInterpreterProcess(),
+ 10);
+
+ Job job1 = new Job("jobId1", "jobName1", null, 200) {
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId1",
+ "title",
+ "text",
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+ };
+
+ Job job2 = new Job("jobId2", "jobName2", null, 200) {
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId2",
+ "title",
+ "text",
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+ };
+
+ job2.setResult("result2");
+
+ scheduler.submit(job1);
+ scheduler.submit(job2);
+
+
+ int cycles = 0;
+ while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+ assertTrue(job1.isRunning());
+ assertTrue(job2.getStatus() == Status.PENDING);
+
+ job2.abort();
+
+ cycles = 0;
+ while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+
+ assertNotNull(job1.getDateFinished());
+ assertTrue(job1.isTerminated());
+ assertNull(job2.getDateFinished());
+ assertTrue(job2.isTerminated());
+ assertEquals("result2", job2.getReturn());
+
+ intpA.close();
+ schedulerSvc.removeScheduler("test");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 1332f16..28c49c6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -22,6 +22,8 @@ import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
@@ -205,13 +207,22 @@ public class Paragraph extends Job implements Serializable, Cloneable {
}
logger().debug("RUN : " + script);
InterpreterResult ret = repl.interpret(script, getInterpreterContext());
+
+ if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
+ return getReturn();
+ }
return ret;
}
@Override
protected boolean jobAbort() {
Interpreter repl = getRepl(getRequiredReplName());
- repl.cancel(getInterpreterContext());
+ Job job = repl.getScheduler().removeFromWaitingQueue(getId());
+ if (job != null) {
+ job.setStatus(Status.ABORT);
+ } else {
+ repl.cancel(getInterpreterContext());
+ }
return true;
}