You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/04/17 01:16:47 UTC

zeppelin git commit: [ZEPPELIN-2355] Livy cancel enhancements

Repository: zeppelin
Updated Branches:
  refs/heads/master e5922b6bb -> 861f1d88f


[ZEPPELIN-2355] Livy cancel enhancements

### What is this PR for?
The Cancel functionality for the Livy interpreter has few issues. One issue is because a variable is not published correctly. Second issue is observed when  there is a delay in launching the application. Any cancel before application launch is ignored.  The third issue is that Cancel is not correctly implemented for SparkSQLInterpreter.

### What type of PR is it?
Bug Fix

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2355

### How should this be tested?
The test cases are modified to test the changes.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?  No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Benoy Antony <be...@apache.org>

Closes #2223 from benoyantony/livy-cancel-enhancement and squashes the following commits:

244e6d3 [Benoy Antony] clear the cancel requests if livy doesnt't support cancellation and modified testcase
75fe574 [Benoy Antony] added testcase for cancellation support on LivySparkSQLInterpreter and moved the removal to finally block
9fc6dbf [Benoy Antony] remove unrelated changes in imports
8673acf [Benoy Antony] ZEPPELIN-2355 Fix race conditions while cancelling a paragraph


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/861f1d88
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/861f1d88
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/861f1d88

Branch: refs/heads/master
Commit: 861f1d88fe2ea105df6892abef14142327c49f6f
Parents: e5922b6
Author: Benoy Antony <be...@apache.org>
Authored: Tue Apr 11 20:55:36 2017 -0700
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Apr 17 09:16:37 2017 +0800

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    | 52 ++++++-----
 .../zeppelin/livy/LivySparkSQLInterpreter.java  |  5 ++
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 94 ++++++++++++++++++--
 3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 8fd0648..43cd507 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -43,13 +43,17 @@ import javax.net.ssl.SSLContext;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.KeyStore;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentHashMap;
 
+
+
 /**
  * Base class for livy interpreters.
  */
@@ -68,9 +72,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
   protected LivyVersion livyVersion;
   private RestTemplate restTemplate;
 
-  // keep tracking the mapping between paragraphId and statementId, so that we can cancel the
-  // statement after we execute it.
-  private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>();
+  Set<Object> paragraphsToCancel = Collections.newSetFromMap(
+      new ConcurrentHashMap<Object, Boolean>());
   private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
       new ConcurrentHashMap<>();
 
@@ -163,21 +166,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
-    if (livyVersion.isCancelSupported()) {
-      String paraId = context.getParagraphId();
-      Integer stmtId = paragraphId2StmtIdMap.get(paraId);
-      try {
-        if (stmtId != null) {
-          cancelStatement(stmtId);
-        }
-      } catch (LivyException e) {
-        LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e);
-      } finally {
-        paragraphId2StmtIdMap.remove(paraId);
-      }
-    } else {
-      LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
-    }
+    paragraphsToCancel.add(context.getParagraphId());
+    LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
   }
 
   @Override
@@ -261,11 +251,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
         }
         stmtInfo = executeStatement(new ExecuteRequest(code));
       }
-      if (paragraphId != null) {
-        paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id);
-      }
       // pull the statement status
       while (!stmtInfo.isAvailable()) {
+        if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
+          cancel(stmtInfo.id, paragraphId);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
+        }
         try {
           Thread.sleep(pullStatusInterval);
         } catch (InterruptedException e) {
@@ -285,9 +276,26 @@ public abstract class BaseLivyInterprereter extends Interpreter {
       }
     } finally {
       if (paragraphId != null) {
-        paragraphId2StmtIdMap.remove(paragraphId);
         paragraphId2StmtProgressMap.remove(paragraphId);
+        paragraphsToCancel.remove(paragraphId);
+      }
+    }
+  }
+
+  private void cancel(int id, String paragraphId) {
+    if (livyVersion.isCancelSupported()) {
+      try {
+        LOGGER.info("Cancelling statement " + id);
+        cancelStatement(id);
+      } catch (LivyException e) {
+        LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
       }
+      finally {
+        paragraphsToCancel.remove(paragraphId);
+      }
+    } else {
+      LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
+      paragraphsToCancel.clear();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index cdd4eac..9c0d359 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -230,6 +230,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
   }
 
   @Override
+  public void cancel(InterpreterContext context) {
+    sparkInterpreter.cancel(context);
+  }
+
+  @Override
   public void close() {
     this.sparkInterpreter.close();
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 3da908c..6537125 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -162,9 +162,9 @@ public class LivyInterpreterIT {
         Thread cancelThread = new Thread() {
           @Override
           public void run() {
-            // invoke cancel after 3 seconds to wait job starting
+            // invoke cancel after 1 millisecond to wait job starting
             try {
-              Thread.sleep(3000);
+              Thread.sleep(1);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }
@@ -306,6 +306,88 @@ public class LivyInterpreterIT {
     }
   }
 
+
+  @Test
+  public void testSparkSQLCancellation() {
+    if (!checkPreCondition()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+    interpreterGroup.get("session_1").add(sparkInterpreter);
+    AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+    MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+    InterpreterOutput output = new InterpreterOutput(outputListener);
+    final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+        "title", "text", authInfo, null, null, null, null, null, output);
+    sparkInterpreter.open();
+
+    final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+    sqlInterpreter.open();
+
+    try {
+      // detect spark version
+      InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, result.message().size());
+
+      boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+      // test DataFrame api
+      if (!isSpark2) {
+        result = sparkInterpreter.interpret(
+            "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+      } else {
+        result = sparkInterpreter.interpret(
+            "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+      }
+      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+
+      // cancel
+      if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            sqlInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        //sleep so that cancelThread performs a cancel.
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        result = sqlInterpreter
+            .interpret("select count(1) from df", context);
+        if (result.code().equals(InterpreterResult.Code.ERROR)) {
+          String message = result.message().get(0).getData();
+          // 2 possibilities, sometimes livy doesn't return the real cancel exception
+          assertTrue(message.contains("cancelled part of cancelled job group") ||
+              message.contains("Job is cancelled"));
+        }
+      }
+    } catch (LivyException e) {
+    } finally {
+      sparkInterpreter.close();
+      sqlInterpreter.close();
+    }
+  }
+
   @Test
   public void testStringWithTruncation() {
     if (!checkPreCondition()) {
@@ -495,9 +577,9 @@ public class LivyInterpreterIT {
         Thread cancelThread = new Thread() {
           @Override
           public void run() {
-            // invoke cancel after 3 seconds to wait job starting
+            // invoke cancel after 1 millisecond to wait job starting
             try {
-              Thread.sleep(3000);
+              Thread.sleep(1);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }
@@ -586,9 +668,9 @@ public class LivyInterpreterIT {
         Thread cancelThread = new Thread() {
           @Override
           public void run() {
-            // invoke cancel after 3 seconds to wait job starting
+            // invoke cancel after 1 millisecond to wait job starting
             try {
-              Thread.sleep(3000);
+              Thread.sleep(1);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }