You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/11/24 18:53:50 UTC

incubator-zeppelin git commit: ZEPPELIN-424 Cancel paragraph in pending status

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master fa5057cfd -> 0665cef05


ZEPPELIN-424 Cancel paragraph in pending status

https://issues.apache.org/jira/browse/ZEPPELIN-424

Cancel paragraph in pending status.
By removing job from waiting queue and set status ABORT.

Author: Lee moon soo <mo...@apache.org>

Closes #454 from Leemoonsoo/ZEPPELIN-424 and squashes the following commits:

b005129 [Lee moon soo] Keep previous result on ABORT in PENDING status
9d5056a [Lee moon soo] Allow job abort in PENDING status
ad26ac4 [Lee moon soo] Add unittest for abort in PENDING status


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0665cef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0665cef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0665cef0

Branch: refs/heads/master
Commit: 0665cef059b6444381eefc6a5870ca510607ce11
Parents: fa5057c
Author: Lee moon soo <mo...@apache.org>
Authored: Sun Nov 22 09:20:47 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Nov 25 02:54:29 2015 +0900

----------------------------------------------------------------------
 .../zeppelin/interpreter/Interpreter.java       |   5 +-
 .../zeppelin/interpreter/InterpreterResult.java |  18 +--
 .../interpreter/remote/RemoteInterpreter.java   |  10 +-
 .../remote/RemoteInterpreterServer.java         |  19 ++-
 .../zeppelin/scheduler/FIFOScheduler.java       |  38 ++++--
 .../zeppelin/scheduler/ParallelScheduler.java   |  35 +++--
 .../zeppelin/scheduler/RemoteScheduler.java     |  18 ++-
 .../apache/zeppelin/scheduler/Scheduler.java    |   7 +-
 .../zeppelin/scheduler/FIFOSchedulerTest.java   |  25 +++-
 .../zeppelin/scheduler/RemoteSchedulerTest.java | 134 ++++++++++++++++++-
 .../org/apache/zeppelin/notebook/Paragraph.java |  13 +-
 11 files changed, 271 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index 3f3503c..d9bb0bf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -121,7 +121,10 @@ public abstract class Interpreter {
    * Called when interpreter is no longer used.
    */
   public void destroy() {
-    getScheduler().stop();
+    Scheduler scheduler = getScheduler();
+    if (scheduler != null) {
+      scheduler.stop();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 20317eb..593cfc7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -23,29 +23,21 @@ import java.util.*;
 
 /**
  * Interpreter result template.
- *
- * @author Leemoonsoo
- *
  */
 public class InterpreterResult implements Serializable {
 
   /**
    *  Type of result after code execution.
-   *
-   * @author Leemoonsoo
-   *
    */
   public static enum Code {
     SUCCESS,
     INCOMPLETE,
-    ERROR
+    ERROR,
+    KEEP_PREVIOUS_RESULT
   }
 
   /**
    * Type of Data.
-   *
-   * @author Leemoonsoo
-   *
    */
   public static enum Type {
     TEXT,
@@ -99,7 +91,7 @@ public class InterpreterResult implements Serializable {
       int magicLength = lastType.getValue().name().length() + 1;
       // 1 for the last \n or space after magic
       int subStringPos = magicLength + lastType.getKey() + 1;
-      return msg.substring(subStringPos); 
+      return msg.substring(subStringPos);
     }
   }
 
@@ -116,7 +108,7 @@ public class InterpreterResult implements Serializable {
       return lastType.getValue();
     }
   }
-  
+
   private int getIndexOfType(String msg, Type t) {
     if (msg == null) {
       return 0;
@@ -124,7 +116,7 @@ public class InterpreterResult implements Serializable {
     String typeString = "%" + t.name().toLowerCase();
     return StringUtils.indexOf(msg, typeString );
   }
-  
+
   private TreeMap<Integer, Type> buildIndexMap(String msg) {
     int lastIndexOftypes = 0;
     TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<Integer, Type>();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9d01561..ef1f115 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -329,9 +329,13 @@ public class RemoteInterpreter extends Interpreter {
   public Scheduler getScheduler() {
     int maxConcurrency = 10;
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-        "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(),
-        maxConcurrency);
+    if (interpreterProcess == null) {
+      return null;
+    } else {
+      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+          "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(),
+          maxConcurrency);
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 7405a66..d6768c9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -37,7 +37,6 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
 import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.Interpreter.FormType;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -62,7 +61,8 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 /**
- *
+ * Entry point for Interpreter process.
+ * Accepting thrift connections from ZeppelinServer.
  */
 public class RemoteInterpreterServer
   extends Thread
@@ -233,6 +233,11 @@ public class RemoteInterpreterServer
       result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException()));
     } else {
       result = (InterpreterResult) job.getReturn();
+
+      // in case of job abort in PENDING status, result can be null
+      if (result == null) {
+        result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT);
+      }
     }
     return convert(result,
         context.getConfig(),
@@ -303,8 +308,16 @@ public class RemoteInterpreterServer
   @Override
   public void cancel(String className, RemoteInterpreterContext interpreterContext)
       throws TException {
+    logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
     Interpreter intp = getInterpreter(className);
-    intp.cancel(convert(interpreterContext));
+    String jobId = interpreterContext.getParagraphId();
+    Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
+
+    if (job != null) {
+      job.setStatus(Status.ABORT);
+    } else {
+      intp.cancel(convert(interpreterContext));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index e7f950a..11b5618 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.scheduler;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.zeppelin.scheduler.Job.Status;
 
 /**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * FIFOScheduler runs submitted job sequentially
  */
 public class FIFOScheduler implements Scheduler {
   List<Job> queue = new LinkedList<Job>();
@@ -83,20 +81,38 @@ public class FIFOScheduler implements Scheduler {
     }
   }
 
+
+  @Override
+  public Job removeFromWaitingQueue(String jobId) {
+    synchronized (queue) {
+      Iterator<Job> it = queue.iterator();
+      while (it.hasNext()) {
+        Job job = it.next();
+        if (job.getId().equals(jobId)) {
+          it.remove();
+          return job;
+        }
+      }
+    }
+    return null;
+  }
+
   @Override
   public void run() {
 
     synchronized (queue) {
       while (terminate == false) {
-        if (runningJob != null || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
+        synchronized (queue) {
+          if (runningJob != null || queue.isEmpty() == true) {
+            try {
+              queue.wait(500);
+            } catch (InterruptedException e) {
+            }
+            continue;
           }
-          continue;
-        }
 
-        runningJob = queue.remove(0);
+          runningJob = queue.remove(0);
+        }
 
         final Scheduler scheduler = this;
         this.executor.execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index c8e8e04..8507861 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.scheduler;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.zeppelin.scheduler.Job.Status;
 
 /**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * Parallel scheduler runs submitted job concurrently.
  */
 public class ParallelScheduler implements Scheduler {
   List<Job> queue = new LinkedList<Job>();
@@ -64,6 +62,21 @@ public class ParallelScheduler implements Scheduler {
   }
 
   @Override
+  public Job removeFromWaitingQueue(String jobId) {
+    synchronized (queue) {
+      Iterator<Job> it = queue.iterator();
+      while (it.hasNext()) {
+        Job job = it.next();
+        if (job.getId().equals(jobId)) {
+          it.remove();
+          return job;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
   public Collection<Job> getJobsRunning() {
     List<Job> ret = new LinkedList<Job>();
     synchronized (queue) {
@@ -87,9 +100,9 @@ public class ParallelScheduler implements Scheduler {
 
   @Override
   public void run() {
-
-    synchronized (queue) {
-      while (terminate == false) {
+    while (terminate == false) {
+      Job job = null;
+      synchronized (queue) {
         if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
           try {
             queue.wait(500);
@@ -98,14 +111,12 @@ public class ParallelScheduler implements Scheduler {
           continue;
         }
 
-        Job job = queue.remove(0);
+        job = queue.remove(0);
         running.add(job);
-        Scheduler scheduler = this;
-
-        executor.execute(new JobRunner(scheduler, job));
       }
+      Scheduler scheduler = this;
 
-
+      executor.execute(new JobRunner(scheduler, job));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index ec5fcd4..51dab12 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.scheduler;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
  */
 public class RemoteScheduler implements Scheduler {
   Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
@@ -108,6 +109,21 @@ public class RemoteScheduler implements Scheduler {
   }
 
   @Override
+  public Job removeFromWaitingQueue(String jobId) {
+    synchronized (queue) {
+      Iterator<Job> it = queue.iterator();
+      while (it.hasNext()) {
+        Job job = it.next();
+        if (job.getId().equals(jobId)) {
+          it.remove();
+          return job;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
   public Collection<Job> getJobsRunning() {
     List<Job> ret = new LinkedList<Job>();
     synchronized (queue) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
index a886c22..90d4397 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -20,10 +20,7 @@ package org.apache.zeppelin.scheduler;
 import java.util.Collection;
 
 /**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
+ * Interface for scheduler
  */
 public interface Scheduler extends Runnable {
   public String getName();
@@ -34,5 +31,7 @@ public interface Scheduler extends Runnable {
 
   public void submit(Job job);
 
+  public Job removeFromWaitingQueue(String jobId);
+
   public void stop();
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
index 3d8495c..7288b67 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
@@ -88,7 +88,30 @@ public class FIFOSchedulerTest extends TestCase {
 
 		assertTrue((500 > (Long)job1.getReturn()));
 		assertEquals(null, job2.getReturn());
+	}
 
+	 public void testRemoveFromWaitingQueue() throws InterruptedException{
+	    Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
+	    assertEquals(0, s.getJobsRunning().size());
+	    assertEquals(0, s.getJobsWaiting().size());
 
-	}
+	    Job job1 = new SleepingJob("job1", null, 500);
+	    Job job2 = new SleepingJob("job2", null, 500);
+
+	    s.submit(job1);
+	    s.submit(job2);
+
+	    Thread.sleep(200);
+
+	    job1.abort();
+	    job2.abort();
+
+	    Thread.sleep(200);
+
+	    assertEquals(Status.ABORT, job1.getStatus());
+	    assertEquals(Status.ABORT, job2.getStatus());
+
+	    assertTrue((500 > (Long)job1.getReturn()));
+	    assertEquals(null, job2.getReturn());
+	  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 08fe190..d17df4f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -18,6 +18,8 @@
 package org.apache.zeppelin.scheduler;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -33,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.scheduler.Job.Status;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -127,7 +130,7 @@ public class RemoteSchedulerTest {
       Thread.sleep(TICK_WAIT);
       cycles++;
     }
-    
+
     assertTrue(job.isTerminated());
     assertEquals(0, scheduler.getJobsWaiting().size());
     assertEquals(0, scheduler.getJobsRunning().size());
@@ -136,4 +139,133 @@ public class RemoteSchedulerTest {
     schedulerSvc.removeScheduler("test");
   }
 
+  @Test
+  public void testAbortOnPending() throws Exception {
+    Properties p = new Properties();
+    final InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env,
+        10 * 1000
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
+        intpA.getInterpreterProcess(),
+        10);
+
+    Job job1 = new Job("jobId1", "jobName1", null, 200) {
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId1",
+          "title",
+          "text",
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LinkedList<InterpreterContextRunner>());
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+    };
+
+    Job job2 = new Job("jobId2", "jobName2", null, 200) {
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId2",
+          "title",
+          "text",
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LinkedList<InterpreterContextRunner>());
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+    };
+
+    job2.setResult("result2");
+
+    scheduler.submit(job1);
+    scheduler.submit(job2);
+
+
+    int cycles = 0;
+    while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+    assertTrue(job1.isRunning());
+    assertTrue(job2.getStatus() == Status.PENDING);
+
+    job2.abort();
+
+    cycles = 0;
+    while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+
+    assertNotNull(job1.getDateFinished());
+    assertTrue(job1.isTerminated());
+    assertNull(job2.getDateFinished());
+    assertTrue(job2.isTerminated());
+    assertEquals("result2", job2.getReturn());
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 1332f16..28c49c6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -22,6 +22,8 @@ import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.slf4j.Logger;
@@ -205,13 +207,22 @@ public class Paragraph extends Job implements Serializable, Cloneable {
     }
     logger().debug("RUN : " + script);
     InterpreterResult ret = repl.interpret(script, getInterpreterContext());
+
+    if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
+      return getReturn();
+    }
     return ret;
   }
 
   @Override
   protected boolean jobAbort() {
     Interpreter repl = getRepl(getRequiredReplName());
-    repl.cancel(getInterpreterContext());
+    Job job = repl.getScheduler().removeFromWaitingQueue(getId());
+    if (job != null) {
+      job.setStatus(Status.ABORT);
+    } else {
+      repl.cancel(getInterpreterContext());
+    }
     return true;
   }