You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/04/17 01:16:47 UTC
zeppelin git commit: [ZEPPELIN-2355] Livy cancel enhancements
Repository: zeppelin
Updated Branches:
refs/heads/master e5922b6bb -> 861f1d88f
[ZEPPELIN-2355] Livy cancel enhancements
### What is this PR for?
The Cancel functionality for the Livy interpreter has few issues. One issue is because a variable is not published correctly. Second issue is observed when there is a delay in launching the application. Any cancel before application launch is ignored. The third issue is that Cancel is not correctly implemented for SparkSQLInterpreter.
### What type of PR is it?
Bug Fix
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2355
### How should this be tested?
The test cases are modified to test the changes.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Benoy Antony <be...@apache.org>
Closes #2223 from benoyantony/livy-cancel-enhancement and squashes the following commits:
244e6d3 [Benoy Antony] clear the cancel requests if livy doesnt't support cancellation and modified testcase
75fe574 [Benoy Antony] added testcase for cancellation support on LivySparkSQLInterpreter and moved the removal to finally block
9fc6dbf [Benoy Antony] remove unrelated changes in imports
8673acf [Benoy Antony] ZEPPELIN-2355 Fix race conditions while cancelling a paragraph
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/861f1d88
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/861f1d88
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/861f1d88
Branch: refs/heads/master
Commit: 861f1d88fe2ea105df6892abef14142327c49f6f
Parents: e5922b6
Author: Benoy Antony <be...@apache.org>
Authored: Tue Apr 11 20:55:36 2017 -0700
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Apr 17 09:16:37 2017 +0800
----------------------------------------------------------------------
.../zeppelin/livy/BaseLivyInterprereter.java | 52 ++++++-----
.../zeppelin/livy/LivySparkSQLInterpreter.java | 5 ++
.../apache/zeppelin/livy/LivyInterpreterIT.java | 94 ++++++++++++++++++--
3 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 8fd0648..43cd507 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -43,13 +43,17 @@ import javax.net.ssl.SSLContext;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
+
+
/**
* Base class for livy interpreters.
*/
@@ -68,9 +72,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
protected LivyVersion livyVersion;
private RestTemplate restTemplate;
- // keep tracking the mapping between paragraphId and statementId, so that we can cancel the
- // statement after we execute it.
- private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>();
+ Set<Object> paragraphsToCancel = Collections.newSetFromMap(
+ new ConcurrentHashMap<Object, Boolean>());
private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
new ConcurrentHashMap<>();
@@ -163,21 +166,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
- if (livyVersion.isCancelSupported()) {
- String paraId = context.getParagraphId();
- Integer stmtId = paragraphId2StmtIdMap.get(paraId);
- try {
- if (stmtId != null) {
- cancelStatement(stmtId);
- }
- } catch (LivyException e) {
- LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e);
- } finally {
- paragraphId2StmtIdMap.remove(paraId);
- }
- } else {
- LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
- }
+ paragraphsToCancel.add(context.getParagraphId());
+ LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
}
@Override
@@ -261,11 +251,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
stmtInfo = executeStatement(new ExecuteRequest(code));
}
- if (paragraphId != null) {
- paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id);
- }
// pull the statement status
while (!stmtInfo.isAvailable()) {
+ if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
+ cancel(stmtInfo.id, paragraphId);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
+ }
try {
Thread.sleep(pullStatusInterval);
} catch (InterruptedException e) {
@@ -285,9 +276,26 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
} finally {
if (paragraphId != null) {
- paragraphId2StmtIdMap.remove(paragraphId);
paragraphId2StmtProgressMap.remove(paragraphId);
+ paragraphsToCancel.remove(paragraphId);
+ }
+ }
+ }
+
+ private void cancel(int id, String paragraphId) {
+ if (livyVersion.isCancelSupported()) {
+ try {
+ LOGGER.info("Cancelling statement " + id);
+ cancelStatement(id);
+ } catch (LivyException e) {
+ LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
}
+ finally {
+ paragraphsToCancel.remove(paragraphId);
+ }
+ } else {
+ LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
+ paragraphsToCancel.clear();
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index cdd4eac..9c0d359 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -230,6 +230,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
}
@Override
+ public void cancel(InterpreterContext context) {
+ sparkInterpreter.cancel(context);
+ }
+
+ @Override
public void close() {
this.sparkInterpreter.close();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 3da908c..6537125 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -162,9 +162,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
- // invoke cancel after 3 seconds to wait job starting
+ // invoke cancel after 1 millisecond to wait job starting
try {
- Thread.sleep(3000);
+ Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -306,6 +306,88 @@ public class LivyInterpreterIT {
}
}
+
+ @Test
+ public void testSparkSQLCancellation() {
+ if (!checkPreCondition()) {
+ return;
+ }
+ InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+ interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+ LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+ sparkInterpreter.setInterpreterGroup(interpreterGroup);
+ interpreterGroup.get("session_1").add(sparkInterpreter);
+ AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+ MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+ InterpreterOutput output = new InterpreterOutput(outputListener);
+ final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+ "title", "text", authInfo, null, null, null, null, null, output);
+ sparkInterpreter.open();
+
+ final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
+ interpreterGroup.get("session_1").add(sqlInterpreter);
+ sqlInterpreter.setInterpreterGroup(interpreterGroup);
+ sqlInterpreter.open();
+
+ try {
+ // detect spark version
+ InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+ // test DataFrame api
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+
+ // cancel
+ if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+ Thread cancelThread = new Thread() {
+ @Override
+ public void run() {
+ sqlInterpreter.cancel(context);
+ }
+ };
+ cancelThread.start();
+ //sleep so that cancelThread performs a cancel.
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ result = sqlInterpreter
+ .interpret("select count(1) from df", context);
+ if (result.code().equals(InterpreterResult.Code.ERROR)) {
+ String message = result.message().get(0).getData();
+ // 2 possibilities, sometimes livy doesn't return the real cancel exception
+ assertTrue(message.contains("cancelled part of cancelled job group") ||
+ message.contains("Job is cancelled"));
+ }
+ }
+ } catch (LivyException e) {
+ } finally {
+ sparkInterpreter.close();
+ sqlInterpreter.close();
+ }
+ }
+
@Test
public void testStringWithTruncation() {
if (!checkPreCondition()) {
@@ -495,9 +577,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
- // invoke cancel after 3 seconds to wait job starting
+ // invoke cancel after 1 millisecond to wait job starting
try {
- Thread.sleep(3000);
+ Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -586,9 +668,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
- // invoke cancel after 3 seconds to wait job starting
+ // invoke cancel after 1 millisecond to wait job starting
try {
- Thread.sleep(3000);
+ Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}