You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/05/25 03:33:15 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new f6facc8  [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
f6facc8 is described below

commit f6facc83db167ce20154961cf77406e046cc920b
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed May 20 00:02:43 2020 +0800

    [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
    
    ### What is this PR for?
    
    Before this PR, if user run the note in non-blocking way via rest api, the execution order of paragraphs is not determined, paragraph 2 will start to run after paragraph 1 enter running state. But we should only start paragraph 2 when paragraph 1 is finished.
    
    This PR fix this issue by minor change the `RemoteScheduler` and introduce `.execution.mode` to tell `RemoteScheduler` that whether the job is running as individual paragraph or as part of note execution.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4832
    
    ### How should this be tested?
    * Unit test is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3775 from zjffdu/ZEPPELIN-4832 and squashes the following commits:
    
    36338a83f [Jeff Zhang] [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order
    
    (cherry picked from commit 6d5783c694cef8a09a5e9f708f93d5a5460fbc16)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/rest/NotebookRestApiTest.java  | 44 +++++++++++++++++++++-
 .../interpreter/remote/RemoteInterpreter.java      | 23 ++++++++---
 .../java/org/apache/zeppelin/notebook/Note.java    | 18 ++++++++-
 .../org/apache/zeppelin/notebook/Paragraph.java    |  8 ++++
 .../apache/zeppelin/scheduler/RemoteScheduler.java | 35 +++++++++++++----
 5 files changed, 113 insertions(+), 15 deletions(-)

diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
index 328a1b2..b4fbb3d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
@@ -193,7 +193,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
   }
 
   @Test
-  public void testRunAllParagraph_AllSuccess() throws IOException {
+  public void testRunNoteBlocking() throws IOException {
     Note note1 = null;
     try {
       note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
@@ -232,6 +232,48 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
   }
 
   @Test
+  public void testRunNoteNonBlocking() throws Exception {
+    Note note1 = null;
+    try {
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      // 2 paragraphs
+      // P1:
+      //    %python
+      //    import time
+      //    time.sleep(5)
+      //    name='hello'
+      //    z.put('name', name)
+      // P2:
+      //    %%sh(interpolate=true)
+      //    echo '{name}'
+      //
+      Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)");
+      p2.setText("%sh(interpolate=true) echo '{name}'");
+
+      PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", "");
+      assertThat(post, isAllowed());
+      Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
+              new TypeToken<Map<String, Object>>() {}.getType());
+      assertEquals(resp.get("status"), "OK");
+      post.releaseConnection();
+
+      p1.waitUntilFinished();
+      p2.waitUntilFinished();
+
+      assertEquals(Job.Status.FINISHED, p1.getStatus());
+      assertEquals(Job.Status.FINISHED, p2.getStatus());
+      assertEquals("hello\n", p2.getReturn().message().get(0).getData());
+    } finally {
+      // cleanup
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
   public void testRunAllParagraph_FirstFailed() throws IOException {
     Note note1 = null;
     try {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 692224b..d1604c1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -391,11 +391,24 @@ public class RemoteInterpreter extends Interpreter {
   public Scheduler getScheduler() {
     // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs
     // running under the scheduler of this session will be aborted.
-    Scheduler s = new RemoteScheduler(
-        RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
-        SchedulerFactory.singleton().getExecutor(),
-        this);
-    return SchedulerFactory.singleton().createOrGetScheduler(s);
+    String executionMode = getProperty(".execution.mode", "paragraph");
+    if (executionMode.equals("paragraph")) {
+      Scheduler s = new RemoteScheduler(
+              RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
+              SchedulerFactory.singleton().getExecutor(),
+              this);
+      return SchedulerFactory.singleton().createOrGetScheduler(s);
+    } else if (executionMode.equals("note")) {
+      String noteId = getProperty(".noteId");
+      Scheduler s = new RemoteScheduler(
+              RemoteInterpreter.class.getSimpleName() + "-" + noteId,
+              SchedulerFactory.singleton().getExecutor(),
+              this);
+      return SchedulerFactory.singleton().createOrGetScheduler(s);
+    } else {
+      throw new RuntimeException("Invalid execution mode: " + executionMode);
+    }
+
   }
 
   private RemoteInterpreterContext convert(InterpreterContext ic) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index ce70491..55cb4f4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -747,7 +747,8 @@ public class Note implements JsonSerializable {
     }
   }
 
-  public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) throws Exception {
+  public void runAll(AuthenticationInfo authenticationInfo,
+                     boolean blocking) throws Exception {
     setRunning(true);
     try {
       for (Paragraph p : getParagraphs()) {
@@ -755,6 +756,17 @@ public class Note implements JsonSerializable {
           continue;
         }
         p.setAuthenticationInfo(authenticationInfo);
+        try {
+          Interpreter interpreter = p.getBindedInterpreter();
+          if (interpreter != null) {
+            // set interpreter property to execution.mode to be note
+            // so that it could use the correct scheduler. see ZEPPELIN-4832
+            interpreter.setProperty(".execution.mode", "note");
+            interpreter.setProperty(".noteId", id);
+          }
+        } catch (InterpreterNotFoundException e) {
+          // ignore, because the following run method will fail if interpreter not found.
+        }
         if (!run(p.getId(), blocking)) {
           logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
           throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " +
@@ -787,7 +799,9 @@ public class Note implements JsonSerializable {
    * @param ctxUser
    * @return
    */
-  public boolean run(String paragraphId, boolean blocking, String ctxUser) {
+  public boolean run(String paragraphId,
+                     boolean blocking,
+                     String ctxUser) {
     Paragraph p = getParagraph(paragraphId);
 
     if (isPersonalizedMode() && ctxUser != null)
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 0ff95c3..0086a47 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -707,6 +707,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
     }
   }
 
+  @VisibleForTesting
+  public void waitUntilFinished() throws Exception {
+    while(!isTerminated()) {
+      LOGGER.debug("Wait for paragraph to be finished");
+      Thread.sleep(1000);
+    }
+  }
+
   private GUI getNoteGui() {
     GUI gui = new GUI();
     gui.setParams(this.note.getNoteParams());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 5f19df1..3797c8b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -47,14 +47,31 @@ public class RemoteScheduler extends AbstractScheduler {
   public void runJobInScheduler(Job job) {
     JobRunner jobRunner = new JobRunner(this, job);
     executor.execute(jobRunner);
-    // wait until it is submitted to the remote
-    while (!jobRunner.isJobSubmittedInRemote()) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
-            "queue.wait", e);
+    String executionMode =
+            remoteInterpreter.getProperty(".execution.mode", "paragraph");
+    if (executionMode.equals("paragraph")) {
+      // wait until it is submitted to the remote
+      while (!jobRunner.isJobSubmittedInRemote()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
+                  "queue.wait", e);
+        }
       }
+    } else if (executionMode.equals("note")){
+      // wait until it is finished
+      while (!jobRunner.isJobExecuted()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " +
+                  "queue.wait", e);
+        }
+      }
+    } else {
+      throw new RuntimeException("Invalid job execution.mode: " + executionMode +
+              ", only 'note' and 'paragraph' are valid");
     }
   }
 
@@ -152,6 +169,10 @@ public class RemoteScheduler extends AbstractScheduler {
       return jobSubmittedRemotely;
     }
 
+    public boolean isJobExecuted() {
+      return jobExecuted;
+    }
+
     @Override
     public void run() {
       JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);