You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/11/15 12:06:12 UTC

incubator-zeppelin git commit: ZEPPELIN - 285 Abort "Pending" or "Running" paragraphs on Interpreter restart

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 66e5433d2 -> 070e08158


ZEPPELIN - 285 Abort "Pending" or "Running" paragraphs on Interpreter restart

This PR address the following issue https://issues.apache.org/jira/browse/ZEPPELIN-285 . The idea is to abort all the jobs which are either running or waiting to run on the interpreter that is going to be restarted.

TODO
- [x] initial implementation
- [x] tests

Author: Khalid Huseynov <kh...@nflabs.com>

Closes #306 from khalidhuseynov/fix-pending-issue and squashes the following commits:

45c15b3 [Khalid Huseynov] fix test, first paragraph may start running
e719cca [Khalid Huseynov] add test
5287bbb [Khalid Huseynov] initial commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/070e0815
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/070e0815
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/070e0815

Branch: refs/heads/master
Commit: 070e0815854d314d0d12eff8db77242d11fa8a0c
Parents: 66e5433
Author: Khalid Huseynov <kh...@nflabs.com>
Authored: Wed Nov 4 14:05:45 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Sun Nov 15 20:06:35 2015 +0900

----------------------------------------------------------------------
 .../interpreter/InterpreterFactory.java         | 18 +++++++
 .../apache/zeppelin/notebook/NotebookTest.java  | 51 +++++++++++++++++++-
 2 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 8a1d6ff..1beebde 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -49,6 +49,8 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -538,6 +540,22 @@ public class InterpreterFactory {
     synchronized (interpreterSettings) {
       InterpreterSetting intpsetting = interpreterSettings.get(id);
       if (intpsetting != null) {
+
+        for (Interpreter intp : intpsetting.getInterpreterGroup()) {
+          for (Job job : intp.getScheduler().getJobsRunning()) {
+            job.abort();
+            job.setStatus(Status.ABORT);
+            logger.info("Job " + job.getJobName() + " aborted ");
+          }
+              
+          for (Job job : intp.getScheduler().getJobsWaiting()) {
+            job.abort();
+            job.setStatus(Status.ABORT);
+            logger.info("Job " + job.getJobName() + " aborted ");
+          }
+        }
+
+
         intpsetting.getInterpreterGroup().close();
         intpsetting.getInterpreterGroup().destroy();
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 31f18cf..faad058 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.notebook;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -273,6 +274,54 @@ public class NotebookTest implements JobListenerFactory{
     notebook.removeNote(note.id());
   }
 
+  @Test
+  public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException,
+      IOException {
+    Note note = notebook.createNote();
+    note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
+
+    Paragraph p1 = note.addParagraph();
+    p1.setText("p1");
+    Paragraph p2 = note.addParagraph();
+    p2.setText("p2");
+    Paragraph p3 = note.addParagraph();
+    p3.setText("p3");
+    Paragraph p4 = note.addParagraph();
+    p4.setText("p4");
+
+    /* all jobs are ready to run */
+    assertEquals(Job.Status.READY, p1.getStatus());
+    assertEquals(Job.Status.READY, p2.getStatus());
+    assertEquals(Job.Status.READY, p3.getStatus());
+    assertEquals(Job.Status.READY, p4.getStatus());
+
+	/* run all */
+    note.runAll();
+
+    /* all are pending in the beginning (first one possibly started)*/
+    assertTrue(p1.getStatus() == Job.Status.PENDING || p1.getStatus() == Job.Status.RUNNING);
+    assertEquals(Job.Status.PENDING, p2.getStatus());
+    assertEquals(Job.Status.PENDING, p3.getStatus());
+    assertEquals(Job.Status.PENDING, p4.getStatus());
+
+    /* wait till first job is terminated and second starts running */
+    while(p1.isTerminated() == false || (p2.getStatus() == Job.Status.PENDING)) Thread.yield();
+
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+    assertEquals(Job.Status.RUNNING, p2.getStatus());
+    assertEquals(Job.Status.PENDING, p3.getStatus());
+    assertEquals(Job.Status.PENDING, p4.getStatus());
+
+    /* restart interpreter */
+    factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
+
+    /* pending and running jobs have been aborted */
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+    assertEquals(Job.Status.ABORT, p2.getStatus());
+    assertEquals(Job.Status.ABORT, p3.getStatus());
+    assertEquals(Job.Status.ABORT, p4.getStatus());
+  }
+
   private void delete(File file){
     if(file.isFile()) file.delete();
     else if(file.isDirectory()){
@@ -285,7 +334,7 @@ public class NotebookTest implements JobListenerFactory{
       file.delete();
     }
   }
-
+  
   @Override
   public JobListener getParagraphJobListener(Note note) {
     return new JobListener(){