You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/11 01:43:52 UTC

[zeppelin] branch master updated: [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b4f55  [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
a0b4f55 is described below

commit a0b4f55b0c38bd1b55bfb31e443895c708a75ce5
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 7 10:56:57 2019 +0800

    [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
    
    ### What is this PR for?
    
    This PR will return empty string when the returned DataFrame of sql is empty, but it is not true for spark 1.x, so for spark 1.x, user would still see empty table in frontend.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3487
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/53948284-70d10f80-4102-11e9-8d0e-ea184a0baaa0.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3321 from zjffdu/ZEPPELIN-3487 and squashes the following commits:
    
    7800bb036 [Jeff Zhang] [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
---
 .../apache/zeppelin/spark/SparkSqlInterpreter.java |  5 +--
 .../zeppelin/spark/NewSparkSqlInterpreterTest.java | 36 +++++++++++++++++++++-
 spark/interpreter/src/test/resources/hive-site.xml |  7 +++++
 .../org/apache/zeppelin/spark/Spark1Shims.java     |  4 +++
 .../org/apache/zeppelin/spark/Spark2Shims.java     |  4 +++
 5 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index b90b6f4..040556b 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.spark;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
@@ -94,10 +95,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
       return new InterpreterResult(Code.SUCCESS, msg);
     } catch (Exception e) {
       if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
-        throw new InterpreterException(e);
+        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
       }
       logger.error("Invocation target exception", e);
-      String msg = e.getMessage()
+      String msg = e.getCause().getMessage()
               + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
       return new InterpreterResult(Code.ERROR, msg);
     }
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index d16a0e7..573ce51 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -52,8 +52,10 @@ public class NewSparkSqlInterpreterTest {
     p.setProperty("spark.app.name", "test");
     p.setProperty("zeppelin.spark.maxResult", "10");
     p.setProperty("zeppelin.spark.concurrentSQL", "true");
-    p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
+    p.setProperty("zeppelin.spark.sql.stacktrace", "true");
     p.setProperty("zeppelin.spark.useNew", "true");
+    p.setProperty("zeppelin.spark.useHiveContext", "true");
+
     intpGroup = new InterpreterGroup();
     sparkInterpreter = new SparkInterpreter(p);
     sparkInterpreter.setInterpreterGroup(intpGroup);
@@ -212,4 +214,36 @@ public class NewSparkSqlInterpreterTest {
 
   }
 
+  @Test
+  public void testDDL() throws InterpreterException {
+    InterpreterResult ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // spark 1.x will still return DataFrame with non-empty columns.
+    // org.apache.spark.sql.DataFrame = [result: string]
+    if (!sparkInterpreter.getSparkContext().version().startsWith("1.")) {
+      assertTrue(ret.message().isEmpty());
+    } else {
+      assertEquals(Type.TABLE, ret.message().get(0).getType());
+      assertEquals("result\n", ret.message().get(0).getData());
+    }
+
+    // create the same table again
+    ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(Type.TEXT, ret.message().get(0).getType());
+    assertTrue(ret.message().get(0).getData().contains("already exists"));
+
+    // invalid DDL
+    ret = sqlInterpreter.interpret("create temporary function udf1 as 'org.apache.zeppelin.UDF'", context);
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(Type.TEXT, ret.message().get(0).getType());
+
+    // spark 1.x could not detect the root cause correctly
+    if (!sparkInterpreter.getSparkContext().version().startsWith("1.")) {
+      assertTrue(ret.message().get(0).getData().contains("ClassNotFoundException") ||
+              ret.message().get(0).getData().contains("Can not load class"));
+    }
+  }
 }
diff --git a/spark/interpreter/src/test/resources/hive-site.xml b/spark/interpreter/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..f26ffcd
--- /dev/null
+++ b/spark/interpreter/src/test/resources/hive-site.xml
@@ -0,0 +1,7 @@
+<configuration>
+    <property>
+        <name>hive.metastore.warehouse.dir</name>
+        <value>${user.home}/hive/warehouse</value>
+        <description>location of default database for the warehouse</description>
+    </property>
+</configuration>
\ No newline at end of file
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index 1b05daf..786d68c 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -56,6 +56,10 @@ public class Spark1Shims extends SparkShims {
     if (obj instanceof DataFrame) {
       DataFrame df = (DataFrame) obj;
       String[] columns = df.columns();
+      // DDL will empty DataFrame
+      if (columns.length == 0) {
+        return "";
+      }
       // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
       List<Row> rows = df.takeAsList(maxResult + 1);
       StringBuilder msg = new StringBuilder();
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index fc5062e..3ecadaa 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -57,6 +57,10 @@ public class Spark2Shims extends SparkShims {
     if (obj instanceof Dataset) {
       Dataset<Row> df = ((Dataset) obj).toDF();
       String[] columns = df.columns();
+      // DDL will empty DataFrame
+      if (columns.length == 0) {
+        return "";
+      }
       // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
       List<Row> rows = df.takeAsList(maxResult + 1);
       StringBuilder msg = new StringBuilder();