You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/02/23 02:56:53 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5078] Make it possible to cancel "run all paragraphs" operation

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 80da790  [ZEPPELIN-5078] Make it possible to cancel "run all paragraphs" operation
80da790 is described below

commit 80da790f73ba2059f3c440e7d201c0d11b485f25
Author: Timo Olkkonen <ti...@gmail.com>
AuthorDate: Tue Feb 16 14:18:04 2021 +0200

    [ZEPPELIN-5078] Make it possible to cancel "run all paragraphs" operation
    
    ### What is this PR for?
    
    Make it possible to cancel "run all paragraphs" operation. Currently, Cancel is blocked by UI as well as blocking "run all" operation on server. This PR will unblock cancel from UI and do "run all" operation in separate thread and so avoid blocking the websocket.
    
    ### What type of PR is it?
    Bug Fix
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-5078
    
    ### How should this be tested?
    Two automatic tests in zeppelin-integration package.
    I have problems with CI, but I am able to run them successfully locally.
    
    Author: Timo Olkkonen <ti...@gmail.com>
    
    Closes #4055 from olkkoti/ZEPPELIN-5078 and squashes the following commits:
    
    09aeb9ef9 [Timo Olkkonen] Add tests for canceling run all operation.
    07fb93437 [Timo Olkkonen] Allow canceling of run all paragraphs operation.
    515d5bf5b [Timo Olkkonen] Stop running paragraphs if one is aborted.
    
    (cherry picked from commit aa99a44c9b131317aa55eac1f6c20694d2deddfd)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../org/apache/zeppelin/AbstractZeppelinIT.java    |  4 +++
 .../zeppelin/integration/ParagraphActionsIT.java   | 40 ++++++++++++++++++++++
 .../zeppelin/integration/SparkParagraphIT.java     | 39 +++++++++++++++++++++
 .../apache/zeppelin/service/NotebookService.java   |  4 +++
 .../org/apache/zeppelin/socket/NotebookServer.java | 24 ++++++++-----
 .../src/app/core/paragraph-base/paragraph-base.ts  |  4 +--
 .../app/notebook/paragraph/paragraph-control.html  |  2 +-
 .../app/notebook/paragraph/paragraph.controller.js |  3 --
 8 files changed, 104 insertions(+), 16 deletions(-)

diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java
index 3a8bf58..4cccb78 100644
--- a/zeppelin-integration/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java
+++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java
@@ -60,6 +60,10 @@ abstract public class AbstractZeppelinIT {
     driver.findElement(by).click();
   }
 
+  protected void cancelParagraph(int paragraphNo) {
+    By by = By.xpath(getParagraphXPath(paragraphNo) + "//span[@class='icon-control-pause']");
+    clickAndWait(by);
+  }
 
   protected String getParagraphXPath(int paragraphNo) {
     return "(//div[@ng-controller=\"ParagraphCtrl\"])[" + paragraphNo + "]";
diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
index b7e6cd2..28f8dfe 100644
--- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
+++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
@@ -279,6 +279,46 @@ public class ParagraphActionsIT extends AbstractZeppelinIT {
     }
   }
 
+  @Test
+  public void testRunAllCancel() throws Exception {
+    try {
+      createNewNote();
+      waitForParagraph(1, "READY");
+      setTextOfParagraph(1, "%sh\\n\\nfor i in {1..30}; do\\n  sleep 1\\ndone");
+      driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-settings']")).click();
+      driver.findElement(By.xpath(getParagraphXPath(1) + "//ul/li/a[@ng-click=\"insertNew('below')\"]"))
+              .click();
+      waitForParagraph(2, "READY");
+      setTextOfParagraph(2, "%sh\\n echo \"Hello World!\"");
+
+
+      driver.findElement(By.xpath(".//*[@id='main']//button[contains(@ng-click, 'runAllParagraphs')]")).sendKeys(Keys.ENTER);
+      ZeppelinITUtils.sleep(1000, false);
+      driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Run all paragraphs?')]" +
+              "//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
+      waitForParagraph(1, "RUNNING");
+
+      ZeppelinITUtils.sleep(2000, false);
+      cancelParagraph(1);
+      waitForParagraph(1, "ABORT");
+      
+      collector.checkThat("First paragraph status is ",
+              getParagraphStatus(1), CoreMatchers.equalTo("ABORT")
+      );
+      collector.checkThat("Second paragraph status is ",
+              getParagraphStatus(2), CoreMatchers.equalTo("READY")
+      );
+
+
+      driver.navigate().refresh();
+      ZeppelinITUtils.sleep(3000, false);
+      deleteTestNotebook(driver);
+      
+    } catch (Exception e) {
+      handleException("Exception in ParagraphActionsIT while testRunAllCancel", e);
+    }
+  }
+
 //  @Test
   public void testRunOnSelectionChange() throws Exception {
     try {
diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
index 90ebec5..5154126 100644
--- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
+++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.integration;
 
 
 import org.apache.zeppelin.AbstractZeppelinIT;
+import org.apache.zeppelin.ZeppelinITUtils;
 import org.apache.zeppelin.WebDriverManager;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
@@ -29,6 +30,7 @@ import org.junit.rules.ErrorCollector;
 import org.openqa.selenium.By;
 import org.openqa.selenium.TimeoutException;
 import org.openqa.selenium.WebElement;
+import org.openqa.selenium.Keys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,6 +156,43 @@ public class SparkParagraphIT extends AbstractZeppelinIT {
   }
 
   @Test
+  public void testCancelPyspark() throws Exception {
+    try {
+      setTextOfParagraph(1, "%pyspark\\nimport time\\nfor i in range(0, 30):\\n\\ttime.sleep(1)");
+      driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-settings']")).click();
+      driver.findElement(By.xpath(getParagraphXPath(1) + "//ul/li/a[@ng-click=\"insertNew('below')\"]"))
+              .click();
+      waitForParagraph(2, "READY");
+      setTextOfParagraph(2, "%pyspark\\nprint(\"Hello World!\")");
+
+
+      driver.findElement(By.xpath(".//*[@id='main']//button[contains(@ng-click, 'runAllParagraphs')]")).sendKeys(Keys.ENTER);
+      ZeppelinITUtils.sleep(1000, false);
+      driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Run all paragraphs?')]" +
+              "//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
+      waitForParagraph(1, "RUNNING");
+
+      ZeppelinITUtils.sleep(2000, false);
+      cancelParagraph(1);
+      waitForParagraph(1, "ABORT");
+
+      collector.checkThat("First paragraph status is ",
+              getParagraphStatus(1), CoreMatchers.equalTo("ABORT")
+      );
+      collector.checkThat("Second paragraph status is ",
+              getParagraphStatus(2), CoreMatchers.equalTo("READY")
+      );
+
+
+      driver.navigate().refresh();
+      ZeppelinITUtils.sleep(3000, false);
+
+    } catch (Exception e) {
+      handleException("Exception in SparkParagraphIT while testCancelPyspark", e);
+    }
+  }
+
+  @Test
   public void testSqlSpark() throws Exception {
     try {
       setTextOfParagraph(1,"%sql\\n" +
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 8cd8ba7..1de3de2 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -21,6 +21,7 @@ package org.apache.zeppelin.service;
 
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
 import static org.apache.zeppelin.interpreter.InterpreterResult.Code.ERROR;
+import static org.apache.zeppelin.scheduler.Job.Status.ABORT;
 
 import com.google.common.base.Strings;
 import java.io.IOException;
@@ -432,6 +433,9 @@ public class NotebookService {
             if (result != null && result.code() == ERROR) {
               return false;
             }
+            if (p.getStatus() == ABORT || p.isAborted()) {
+              return false;
+            }
           } catch (Exception e) {
             throw new IOException("Fail to run paragraph json: " + raw, e);
           }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 224cbc5..146e8e9 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -1486,17 +1486,23 @@ public class NotebookServer extends WebSocketServlet
             new TypeToken<List<Map<String, Object>>>() {
             }.getType());
 
-    if (!getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage),
-        new WebSocketServiceCallback<Paragraph>(conn))) {
-      // If one paragraph fails, we need to broadcast paragraph states to the client,
-      // or paragraphs not run will stay in PENDING state.
-      Note note = getNotebookService().getNote(noteId, getServiceContext(fromMessage), new SimpleServiceCallback());
-      if (note != null) {
-        for (Paragraph p : note.getParagraphs()) {
-          broadcastParagraph(note, p, null);
+    executorService.submit(() -> {
+      try {
+        if (!getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage),
+                new WebSocketServiceCallback<Paragraph>(conn))) {
+          // If one paragraph fails, we need to broadcast paragraph states to the client,
+          // or paragraphs not run will stay in PENDING state.
+          Note note = getNotebookService().getNote(noteId, getServiceContext(fromMessage), new SimpleServiceCallback());
+          if (note != null) {
+            for (Paragraph p : note.getParagraphs()) {
+              broadcastParagraph(note, p, null);
+            }
+          }
         }
+      } catch (Throwable t) {
+        NotebookServer.LOG.error("Error in running all paragraphs", t);
       }
-    }
+    });
   }
 
   private void broadcastSpellExecution(NotebookSocket conn,
diff --git a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
index 7fcb35c..26cffcc 100644
--- a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
+++ b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
@@ -316,8 +316,6 @@ export abstract class ParagraphBase extends MessageListenersManager {
   }
 
   cancelParagraph() {
-    if (!this.isEntireNoteRunning) {
-      this.messageService.cancelParagraph(this.paragraph.id);
-    }
+    this.messageService.cancelParagraph(this.paragraph.id);
   }
 }
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html
index 841ca5f..588e898 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html
@@ -53,7 +53,7 @@ limitations under the License.
     <span class="icon-control-pause" style="cursor:pointer;color:#CD5C5C"
           tooltip-placement="top" uib-tooltip="Cancel (Ctrl+{{ (isMac ? 'Option' : 'Alt') }}+C)"
           ng-click="cancelParagraph(paragraph)"
-          ng-class="{'item-disable' : isNoteRunning}"
+          ng-class="{'item-disable' : paragraph.status!='RUNNING'}"
           ng-show="paragraph.status=='RUNNING' || paragraph.status=='PENDING'"></span>
     <span class="{{paragraph.config.editorHide ? 'icon-size-fullscreen' : 'icon-size-actual'}}" style="cursor:pointer" tooltip-placement="top"
           uib-tooltip="{{(paragraph.config.editorHide ? 'Show' : 'Hide')}} editor (Ctrl+{{ (isMac ? 'Option' : 'Alt') }}+E)"
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index 21efb20..0858519 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -276,9 +276,6 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
   };
 
   $scope.cancelParagraph = function(paragraph) {
-    if ($scope.isNoteRunning) {
-      return;
-    }
     console.log('Cancel %o', paragraph.id);
     websocketMsgSrv.cancelParagraphRun(paragraph.id);
   };