You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/11/15 12:06:12 UTC
incubator-zeppelin git commit: ZEPPELIN - 285 Abort "Pending" or
"Running" paragraphs on Interpreter restart
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 66e5433d2 -> 070e08158
ZEPPELIN - 285 Abort "Pending" or "Running" paragraphs on Interpreter restart
This PR address the following issue https://issues.apache.org/jira/browse/ZEPPELIN-285 . The idea is to abort all the jobs which are either running or waiting to run on the interpreter that is going to be restarted.
TODO
- [x] initial implementation
- [x] tests
Author: Khalid Huseynov <kh...@nflabs.com>
Closes #306 from khalidhuseynov/fix-pending-issue and squashes the following commits:
45c15b3 [Khalid Huseynov] fix test, first paragraph may start running
e719cca [Khalid Huseynov] add test
5287bbb [Khalid Huseynov] initial commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/070e0815
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/070e0815
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/070e0815
Branch: refs/heads/master
Commit: 070e0815854d314d0d12eff8db77242d11fa8a0c
Parents: 66e5433
Author: Khalid Huseynov <kh...@nflabs.com>
Authored: Wed Nov 4 14:05:45 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Sun Nov 15 20:06:35 2015 +0900
----------------------------------------------------------------------
.../interpreter/InterpreterFactory.java | 18 +++++++
.../apache/zeppelin/notebook/NotebookTest.java | 51 +++++++++++++++++++-
2 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 8a1d6ff..1beebde 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -49,6 +49,8 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -538,6 +540,22 @@ public class InterpreterFactory {
synchronized (interpreterSettings) {
InterpreterSetting intpsetting = interpreterSettings.get(id);
if (intpsetting != null) {
+
+ for (Interpreter intp : intpsetting.getInterpreterGroup()) {
+ for (Job job : intp.getScheduler().getJobsRunning()) {
+ job.abort();
+ job.setStatus(Status.ABORT);
+ logger.info("Job " + job.getJobName() + " aborted ");
+ }
+
+ for (Job job : intp.getScheduler().getJobsWaiting()) {
+ job.abort();
+ job.setStatus(Status.ABORT);
+ logger.info("Job " + job.getJobName() + " aborted ");
+ }
+ }
+
+
intpsetting.getInterpreterGroup().close();
intpsetting.getInterpreterGroup().destroy();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 31f18cf..faad058 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.notebook;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -273,6 +274,54 @@ public class NotebookTest implements JobListenerFactory{
notebook.removeNote(note.id());
}
+ @Test
+ public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException,
+ IOException {
+ Note note = notebook.createNote();
+ note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
+
+ Paragraph p1 = note.addParagraph();
+ p1.setText("p1");
+ Paragraph p2 = note.addParagraph();
+ p2.setText("p2");
+ Paragraph p3 = note.addParagraph();
+ p3.setText("p3");
+ Paragraph p4 = note.addParagraph();
+ p4.setText("p4");
+
+ /* all jobs are ready to run */
+ assertEquals(Job.Status.READY, p1.getStatus());
+ assertEquals(Job.Status.READY, p2.getStatus());
+ assertEquals(Job.Status.READY, p3.getStatus());
+ assertEquals(Job.Status.READY, p4.getStatus());
+
+ /* run all */
+ note.runAll();
+
+ /* all are pending in the beginning (first one possibly started)*/
+ assertTrue(p1.getStatus() == Job.Status.PENDING || p1.getStatus() == Job.Status.RUNNING);
+ assertEquals(Job.Status.PENDING, p2.getStatus());
+ assertEquals(Job.Status.PENDING, p3.getStatus());
+ assertEquals(Job.Status.PENDING, p4.getStatus());
+
+ /* wait till first job is terminated and second starts running */
+ while(p1.isTerminated() == false || (p2.getStatus() == Job.Status.PENDING)) Thread.yield();
+
+ assertEquals(Job.Status.FINISHED, p1.getStatus());
+ assertEquals(Job.Status.RUNNING, p2.getStatus());
+ assertEquals(Job.Status.PENDING, p3.getStatus());
+ assertEquals(Job.Status.PENDING, p4.getStatus());
+
+ /* restart interpreter */
+ factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
+
+ /* pending and running jobs have been aborted */
+ assertEquals(Job.Status.FINISHED, p1.getStatus());
+ assertEquals(Job.Status.ABORT, p2.getStatus());
+ assertEquals(Job.Status.ABORT, p3.getStatus());
+ assertEquals(Job.Status.ABORT, p4.getStatus());
+ }
+
private void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){
@@ -285,7 +334,7 @@ public class NotebookTest implements JobListenerFactory{
file.delete();
}
}
-
+
@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){