You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/10/29 01:58:28 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5102]. Simplify
spark sql error message
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2b8341e [ZEPPELIN-5102]. Simplify spark sql error message
2b8341e is described below
commit 2b8341e8a8585fbe68d252140ab1f90251cd7d17
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Oct 23 10:35:09 2020 +0800
[ZEPPELIN-5102]. Simplify spark sql error message
### What is this PR for?
In this PR, we only print the error message when it is `AnalysisException`, this would make the error message more clear for users.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5102
### How should this be tested?
* Manually tested
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/96950181-4f61a900-151c-11eb-9601-a15b971bd13d.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3951 from zjffdu/ZEPPELIN-5102 and squashes the following commits:
a912b219e [Jeff Zhang] [ZEPPELIN-5102]. Simplify spark sql error message
(cherry picked from commit fa1469927f59cc0ec7ead1b07502348a8d28bee3)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../apache/zeppelin/spark/SparkSqlInterpreter.java | 39 +++++++++-----
.../zeppelin/spark/SparkSqlInterpreterTest.java | 61 +++++++++++-----------
.../integration/ZSessionIntegrationTest.java | 4 +-
3 files changed, 60 insertions(+), 44 deletions(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index d440da1..4161f1a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.spark.SparkContext;
+import org.apache.spark.sql.AnalysisException;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,6 +33,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
@@ -84,7 +86,6 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
Object sqlContext = sparkInterpreter.getSQLContext();
SparkContext sc = sparkInterpreter.getSparkContext();
- StringBuilder builder = new StringBuilder();
List<String> sqls = sqlSplitter.splitSql(st);
int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
"" + sparkInterpreter.getZeppelinContext().getMaxResult()));
@@ -103,19 +104,33 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
curSql = sql;
String result = sparkInterpreter.getZeppelinContext()
.showData(method.invoke(sqlContext, sql), maxResult);
- builder.append(result);
+ context.out.write(result);
}
+ context.out.flush();
} catch (Exception e) {
- builder.append("\n%text Error happens in sql: " + curSql + "\n");
- if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace", "false"))) {
- builder.append(ExceptionUtils.getStackTrace(e));
- } else {
- LOGGER.error("Invocation target exception", e);
- String msg = e.getMessage()
- + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
- builder.append(msg);
+ try {
+ if (e.getCause() instanceof AnalysisException) {
+ // just return the error message from spark if it is AnalysisException
+ context.out.write(e.getCause().getMessage());
+ context.out.flush();
+ return new InterpreterResult(Code.ERROR);
+ } else {
+ context.out.write("\nError happens in sql: " + curSql + "\n");
+ if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace", "false"))) {
+ context.out.write(ExceptionUtils.getStackTrace(e.getCause()));
+ } else {
+ LOGGER.error("Invocation target exception", e);
+ String msg = e.getCause().getMessage()
+ + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
+ context.out.write(msg);
+ }
+ context.out.flush();
+ return new InterpreterResult(Code.ERROR);
+ }
+ } catch (IOException ex) {
+ LOGGER.error("Fail to write output", ex);
+ return new InterpreterResult(Code.ERROR);
}
- return new InterpreterResult(Code.ERROR, builder.toString());
} finally {
sc.clearJobGroup();
if (!sparkInterpreter.isScala212()) {
@@ -123,7 +138,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
}
}
- return new InterpreterResult(Code.SUCCESS, builder.toString());
+ return new InterpreterResult(Code.SUCCESS);
}
@Override
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index d32964f..843ee77 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -31,6 +31,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.Properties;
@@ -88,7 +89,7 @@ public class SparkSqlInterpreterTest {
}
@Test
- public void test() throws InterpreterException {
+ public void test() throws InterpreterException, IOException {
InterpreterResult result = sparkInterpreter.interpret("case class Test(name:String, age:Int)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\\t1\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\\n1\", 34)))", context);
@@ -98,12 +99,12 @@ public class SparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(Type.TABLE, ret.message().get(0).getType());
- assertEquals("name\tage\nmoon 1\t33\npark 1\t34\n", ret.message().get(0).getData());
+ assertEquals(Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
+ assertEquals("name\tage\nmoon 1\t33\npark 1\t34\n", context.out.toInterpreterResultMessage().get(0).getData());
ret = sqlInterpreter.interpret("select wrong syntax", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertTrue(ret.message().get(0).getData().length() > 0);
+ assertTrue(context.out.toInterpreterResultMessage().get(0).getData().length() > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sqlInterpreter.interpret("select case when name='aa' then name else name end from test", context).code());
}
@@ -152,7 +153,7 @@ public class SparkSqlInterpreterTest {
}
@Test
- public void testMaxResults() throws InterpreterException {
+ public void testMaxResults() throws InterpreterException, IOException {
sparkInterpreter.interpret("case class P(age:Int)", context);
sparkInterpreter.interpret(
"val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
@@ -162,19 +163,19 @@ public class SparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
// the number of rows is 10+1, 1 is the head of table
- assertEquals(11, ret.message().get(0).getData().split("\n").length);
- assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+ assertEquals(11, context.out.toInterpreterResultMessage().get(0).getData().split("\n").length);
+ assertTrue(context.out.toInterpreterResultMessage().get(1).getData().contains("alert-warning"));
// test limit local property
context.getLocalProperties().put("limit", "5");
ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
// the number of rows is 5+1, 1 is the head of table
- assertEquals(6, ret.message().get(0).getData().split("\n").length);
+ assertEquals(6, context.out.toInterpreterResultMessage().get(0).getData().split("\n").length);
}
@Test
- public void testSingleRowResult() throws InterpreterException {
+ public void testSingleRowResult() throws InterpreterException, IOException {
sparkInterpreter.interpret("case class P(age:Int)", context);
sparkInterpreter.interpret(
"val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10)))",
@@ -195,12 +196,12 @@ public class SparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select count(1), sum(age) from gr", context);
context.getLocalProperties().remove("template");
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(Type.HTML, ret.message().get(0).getType());
- assertEquals("Total count: <h1>10</h1>, Total age: <h1>55</h1>", ret.message().get(0).getData());
+ assertEquals(Type.HTML, context.out.toInterpreterResultMessage().get(0).getType());
+ assertEquals("Total count: <h1>10</h1>, Total age: <h1>55</h1>", context.out.toInterpreterResultMessage().get(0).getData());
}
@Test
- public void testMultipleStatements() throws InterpreterException {
+ public void testMultipleStatements() throws InterpreterException, IOException {
sparkInterpreter.interpret("case class P(age:Int)", context);
sparkInterpreter.interpret(
"val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4)))",
@@ -211,33 +212,33 @@ public class SparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret(
"select * --comment_1\nfrom gr;select count(1) from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(ret.message().toString(), 2, ret.message().size());
- assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(0).getType());
- assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(1).getType());
+ assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
+ assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
+ assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(1).getType());
// One correct sql + One invalid sql
ret = sqlInterpreter.interpret("select * from gr;invalid_sql", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals(ret.message().toString(), 2, ret.message().size());
- assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(0).getType());
+ assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
+ assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
if (!sparkInterpreter.getSparkVersion().isSpark1()) {
- assertTrue(ret.message().toString(), ret.message().get(1).getData().contains("ParseException"));
+ assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
}
// One correct sql + One invalid sql + One valid sql (skipped)
ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals(ret.message().toString(), 2, ret.message().size());
- assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(0).getType());
+ assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
+ assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
if (!sparkInterpreter.getSparkVersion().isSpark1()) {
- assertTrue(ret.message().toString(), ret.message().get(1).getData().contains("ParseException"));
+ assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
}
// Two 2 comments
ret = sqlInterpreter.interpret(
"--comment_1\n--comment_2", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(ret.message().toString(), 0, ret.message().size());
+ assertEquals(context.out.toString(), 0, context.out.toInterpreterResultMessage().size());
}
@Test
@@ -285,7 +286,7 @@ public class SparkSqlInterpreterTest {
}
@Test
- public void testDDL() throws InterpreterException {
+ public void testDDL() throws InterpreterException, IOException {
InterpreterResult ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
// spark 1.x will still return DataFrame with non-empty columns.
@@ -300,20 +301,20 @@ public class SparkSqlInterpreterTest {
// create the same table again
ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals(1, ret.message().size());
- assertEquals(Type.TEXT, ret.message().get(0).getType());
- assertTrue(ret.message().get(0).getData().contains("already exists"));
+ assertEquals(1, context.out.toInterpreterResultMessage().size());
+ assertEquals(Type.TEXT, context.out.toInterpreterResultMessage().get(0).getType());
+ assertTrue(context.out.toInterpreterResultMessage().get(0).getData().contains("already exists"));
// invalid DDL
ret = sqlInterpreter.interpret("create temporary function udf1 as 'org.apache.zeppelin.UDF'", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals(1, ret.message().size());
- assertEquals(Type.TEXT, ret.message().get(0).getType());
+ assertEquals(1, context.out.toInterpreterResultMessage().size());
+ assertEquals(Type.TEXT, context.out.toInterpreterResultMessage().get(0).getType());
// spark 1.x could not detect the root cause correctly
if (!sparkInterpreter.getSparkContext().version().startsWith("1.")) {
- assertTrue(ret.message().get(0).getData().contains("ClassNotFoundException") ||
- ret.message().get(0).getData().contains("Can not load class"));
+ assertTrue(context.out.toInterpreterResultMessage().get(0).getData().contains("ClassNotFoundException") ||
+ context.out.toInterpreterResultMessage().get(0).getData().contains("Can not load class"));
}
}
}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 12042c8..2ec61bd 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -206,7 +206,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
assertEquals(Status.ERROR, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
assertEquals(0, result.getJobUrls().size());
} finally {
@@ -279,7 +279,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
assertEquals(Status.ERROR, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
assertEquals(0, result.getJobUrls().size());
// cancel