You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/04/28 00:49:00 UTC

zeppelin git commit: ZEPPELIN-2151. Add integration test for livy cancel api

Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.7 b9bc2b810 -> 59c1e1b0a


ZEPPELIN-2151. Add integration test for livy cancel api

### What is this PR for?

Just add integration test for livy cancel api. Only do it for livy 0.3 as the cancel api is only available in livy 0.3

### What type of PR is it?
[ Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2151

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2056 from zjffdu/ZEPPELIN-2151 and squashes the following commits:

b7ca7b3 [Jeff Zhang] ZEPPELIN-2151. Add integration test for livy cancel api

(cherry picked from commit 152147122b9797baef20a382eb880eadcf7cdc0f)
Signed-off-by: Jeff Zhang <zj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/59c1e1b0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/59c1e1b0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/59c1e1b0

Branch: refs/heads/branch-0.7
Commit: 59c1e1b0abc68ac978fc2c4a0284e852a93f4fc7
Parents: b9bc2b8
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 22 19:59:47 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Apr 28 08:48:51 2017 +0800

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    |  2 +-
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 85 ++++++++++++++++++--
 2 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/59c1e1b0/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 7f92127..ecb5d77 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -55,7 +55,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
   private int pullStatusInterval;
   protected boolean displayAppInfo;
   private AtomicBoolean sessionExpired = new AtomicBoolean(false);
-  private LivyVersion livyVersion;
+  protected LivyVersion livyVersion;
 
   // keep tracking the mapping between paragraphId and statementId, so that we can cancel the
   // statement after we execute it.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/59c1e1b0/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index aec2742..7df9b20 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -82,13 +82,13 @@ public class LivyInterpreterIT {
     }
     InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
     interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+    final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
     interpreterGroup.get("session_1").add(sparkInterpreter);
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+    final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
         "title", "text", authInfo, null, null, null, null, null, output);
     sparkInterpreter.open();
 
@@ -158,6 +158,31 @@ public class LivyInterpreterIT {
       assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("incomplete statement"));
+
+      // cancel
+      if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            sparkInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = sparkInterpreter
+            .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
+      }
+
     } finally {
       sparkInterpreter.close();
     }
@@ -289,11 +314,11 @@ public class LivyInterpreterIT {
       return;
     }
 
-    LivyPySparkInterpreter pysparkInterpreter = new LivyPySparkInterpreter(properties);
+    final LivyPySparkInterpreter pysparkInterpreter = new LivyPySparkInterpreter(properties);
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark",
+    final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark",
         "title", "text", authInfo, null, null, null, null, null, output);
     pysparkInterpreter.open();
 
@@ -345,6 +370,31 @@ public class LivyInterpreterIT {
       assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("name 'a' is not defined"));
+
+      // cancel
+      if (pysparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            pysparkInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = pysparkInterpreter
+            .interpret("import time\n" +
+                "sc.range(1, 10).foreach(lambda a: time.sleep(10))", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
+      }
     } finally {
       pysparkInterpreter.close();
     }
@@ -388,7 +438,7 @@ public class LivyInterpreterIT {
       return;
     }
 
-    LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties);
+    final LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties);
     try {
       sparkRInterpreter.getLivyVersion();
     } catch (APINotFoundException e) {
@@ -398,7 +448,7 @@ public class LivyInterpreterIT {
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr",
+    final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr",
         "title", "text", authInfo, null, null, null, null, null, output);
     sparkRInterpreter.open();
 
@@ -412,6 +462,29 @@ public class LivyInterpreterIT {
         assertEquals(InterpreterResult.Code.SUCCESS, result.code());
         assertEquals(1, result.message().size());
         assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+
+        // cancel
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            sparkRInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\n" +
+            "df1 <- dapplyCollect(df, function(x) " +
+            "{ Sys.sleep(10); x <- cbind(x, x$waiting * 60) })", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
       } else {
         result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)" +
             "\nhead(df)", context);