You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/05/25 03:33:15 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4832]. run note in
non-blocking way could not guarantee the paragraph execution order
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new f6facc8 [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
f6facc8 is described below
commit f6facc83db167ce20154961cf77406e046cc920b
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed May 20 00:02:43 2020 +0800
[ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
### What is this PR for?
Before this PR, if user run the note in non-blocking way via rest api, the execution order of paragraphs is not determined, paragraph 2 will start to run after paragraph 1 enter running state. But we should only start paragraph 2 when paragraph 1 is finished.
This PR fix this issue by minor change the `RemoteScheduler` and introduce `.execution.mode` to tell `RemoteScheduler` that whether the job is running as individual paragraph or as part of note execution.
### What type of PR is it?
[Bug Fix ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4832
### How should this be tested?
* Unit test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3775 from zjffdu/ZEPPELIN-4832 and squashes the following commits:
36338a83f [Jeff Zhang] [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
(cherry picked from commit 6d5783c694cef8a09a5e9f708f93d5a5460fbc16)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../apache/zeppelin/rest/NotebookRestApiTest.java | 44 +++++++++++++++++++++-
.../interpreter/remote/RemoteInterpreter.java | 23 ++++++++---
.../java/org/apache/zeppelin/notebook/Note.java | 18 ++++++++-
.../org/apache/zeppelin/notebook/Paragraph.java | 8 ++++
.../apache/zeppelin/scheduler/RemoteScheduler.java | 35 +++++++++++++----
5 files changed, 113 insertions(+), 15 deletions(-)
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
index 328a1b2..b4fbb3d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
@@ -193,7 +193,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
}
@Test
- public void testRunAllParagraph_AllSuccess() throws IOException {
+ public void testRunNoteBlocking() throws IOException {
Note note1 = null;
try {
note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
@@ -232,6 +232,48 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
}
@Test
+ public void testRunNoteNonBlocking() throws Exception {
+ Note note1 = null;
+ try {
+ note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+ // 2 paragraphs
+ // P1:
+ // %python
+ // import time
+ // time.sleep(5)
+ // name='hello'
+ // z.put('name', name)
+ // P2:
+ // %%sh(interpolate=true)
+ // echo '{name}'
+ //
+ Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)");
+ p2.setText("%sh(interpolate=true) echo '{name}'");
+
+ PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", "");
+ assertThat(post, isAllowed());
+ Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
+ new TypeToken<Map<String, Object>>() {}.getType());
+ assertEquals(resp.get("status"), "OK");
+ post.releaseConnection();
+
+ p1.waitUntilFinished();
+ p2.waitUntilFinished();
+
+ assertEquals(Job.Status.FINISHED, p1.getStatus());
+ assertEquals(Job.Status.FINISHED, p2.getStatus());
+ assertEquals("hello\n", p2.getReturn().message().get(0).getData());
+ } finally {
+ // cleanup
+ if (null != note1) {
+ TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+ }
+ }
+ }
+
+ @Test
public void testRunAllParagraph_FirstFailed() throws IOException {
Note note1 = null;
try {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 692224b..d1604c1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -391,11 +391,24 @@ public class RemoteInterpreter extends Interpreter {
public Scheduler getScheduler() {
// one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs
// running under the scheduler of this session will be aborted.
- Scheduler s = new RemoteScheduler(
- RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
- SchedulerFactory.singleton().getExecutor(),
- this);
- return SchedulerFactory.singleton().createOrGetScheduler(s);
+ String executionMode = getProperty(".execution.mode", "paragraph");
+ if (executionMode.equals("paragraph")) {
+ Scheduler s = new RemoteScheduler(
+ RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
+ SchedulerFactory.singleton().getExecutor(),
+ this);
+ return SchedulerFactory.singleton().createOrGetScheduler(s);
+ } else if (executionMode.equals("note")) {
+ String noteId = getProperty(".noteId");
+ Scheduler s = new RemoteScheduler(
+ RemoteInterpreter.class.getSimpleName() + "-" + noteId,
+ SchedulerFactory.singleton().getExecutor(),
+ this);
+ return SchedulerFactory.singleton().createOrGetScheduler(s);
+ } else {
+ throw new RuntimeException("Invalid execution mode: " + executionMode);
+ }
+
}
private RemoteInterpreterContext convert(InterpreterContext ic) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index ce70491..55cb4f4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -747,7 +747,8 @@ public class Note implements JsonSerializable {
}
}
- public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) throws Exception {
+ public void runAll(AuthenticationInfo authenticationInfo,
+ boolean blocking) throws Exception {
setRunning(true);
try {
for (Paragraph p : getParagraphs()) {
@@ -755,6 +756,17 @@ public class Note implements JsonSerializable {
continue;
}
p.setAuthenticationInfo(authenticationInfo);
+ try {
+ Interpreter interpreter = p.getBindedInterpreter();
+ if (interpreter != null) {
+ // set interpreter property to execution.mode to be note
+ // so that it could use the correct scheduler. see ZEPPELIN-4832
+ interpreter.setProperty(".execution.mode", "note");
+ interpreter.setProperty(".noteId", id);
+ }
+ } catch (InterpreterNotFoundException e) {
+ // ignore, because the following run method will fail if interpreter not found.
+ }
if (!run(p.getId(), blocking)) {
logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " +
@@ -787,7 +799,9 @@ public class Note implements JsonSerializable {
* @param ctxUser
* @return
*/
- public boolean run(String paragraphId, boolean blocking, String ctxUser) {
+ public boolean run(String paragraphId,
+ boolean blocking,
+ String ctxUser) {
Paragraph p = getParagraph(paragraphId);
if (isPersonalizedMode() && ctxUser != null)
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 0ff95c3..0086a47 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -707,6 +707,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
}
}
+ @VisibleForTesting
+ public void waitUntilFinished() throws Exception {
+ while(!isTerminated()) {
+ LOGGER.debug("Wait for paragraph to be finished");
+ Thread.sleep(1000);
+ }
+ }
+
private GUI getNoteGui() {
GUI gui = new GUI();
gui.setParams(this.note.getNoteParams());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 5f19df1..3797c8b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -47,14 +47,31 @@ public class RemoteScheduler extends AbstractScheduler {
public void runJobInScheduler(Job job) {
JobRunner jobRunner = new JobRunner(this, job);
executor.execute(jobRunner);
- // wait until it is submitted to the remote
- while (!jobRunner.isJobSubmittedInRemote()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
- "queue.wait", e);
+ String executionMode =
+ remoteInterpreter.getProperty(".execution.mode", "paragraph");
+ if (executionMode.equals("paragraph")) {
+ // wait until it is submitted to the remote
+ while (!jobRunner.isJobSubmittedInRemote()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
+ "queue.wait", e);
+ }
}
+ } else if (executionMode.equals("note")){
+ // wait until it is finished
+ while (!jobRunner.isJobExecuted()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " +
+ "queue.wait", e);
+ }
+ }
+ } else {
+ throw new RuntimeException("Invalid job execution.mode: " + executionMode +
+ ", only 'note' and 'paragraph' are valid");
}
}
@@ -152,6 +169,10 @@ public class RemoteScheduler extends AbstractScheduler {
return jobSubmittedRemotely;
}
+ public boolean isJobExecuted() {
+ return jobExecuted;
+ }
+
@Override
public void run() {
JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);