You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/11 03:25:45 UTC

[zeppelin] branch branch-0.8 updated: [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 51b615f  [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
51b615f is described below

commit 51b615f705a0dbe1111c2937ccd596d585b5a4ab
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 7 10:56:57 2019 +0800

    [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
    
    This PR will return empty string when the returned DataFrame of sql is empty, but it is not true for spark 1.x, so for spark 1.x, user would still see empty table in frontend.
    
    [Bug Fix]
    
    * [ ] - Task
    
    * https://issues.apache.org/jira/browse/ZEPPELIN-3487
    
    * CI pass
    
    ![image](https://user-images.githubusercontent.com/164491/53948284-70d10f80-4102-11e9-8d0e-ea184a0baaa0.png)
    
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3321 from zjffdu/ZEPPELIN-3487 and squashes the following commits:
    
    7800bb036 [Jeff Zhang] [ZEPPELIN-3487]. Spark SQL (%sql) paragraphs for DDLs should suppress datagrid-ui
    
    (cherry picked from commit a0b4f55b0c38bd1b55bfb31e443895c708a75ce5)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/spark/SparkSqlInterpreter.java | 11 +--
 .../zeppelin/spark/SparkZeppelinContext.java       |  4 ++
 .../zeppelin/spark/NewSparkSqlInterpreterTest.java | 83 +++++++++++++++-------
 spark/interpreter/src/test/resources/hive-site.xml |  7 ++
 4 files changed, 76 insertions(+), 29 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 2892373..cedb353 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -17,12 +17,13 @@
 
 package org.apache.zeppelin.spark;
 
+
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -119,12 +120,12 @@ public class SparkSqlInterpreter extends Interpreter {
           Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation")) ?
               interpolate(st, context.getResourcePool()) : st;
       rdd = sqlMethod.invoke(sqlc, effectiveString);
-    } catch (InvocationTargetException ite) {
+    } catch (InvocationTargetException e) {
       if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
-        throw new InterpreterException(ite);
+        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
       }
-      logger.error("Invocation target exception", ite);
-      String msg = ite.getTargetException().getMessage()
+      logger.error("Invocation target exception", e);
+      String msg = e.getCause().getMessage()
               + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
       return new InterpreterResult(Code.ERROR, msg);
     } catch (NoSuchMethodException | SecurityException | IllegalAccessException
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
index 2a2b7e3..979a140 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -128,6 +128,10 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
         | IllegalArgumentException | InvocationTargetException e) {
       throw new RuntimeException(e);
     }
+    // DDL will empty DataFrame
+    if (columns.isEmpty()) {
+      return "";
+    }
 
     StringBuilder msg = new StringBuilder();
     msg.append("%table ");
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 0dede8a..04813fc 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -47,8 +47,10 @@ public class NewSparkSqlInterpreterTest {
     p.setProperty("spark.app.name", "test");
     p.setProperty("zeppelin.spark.maxResult", "10");
     p.setProperty("zeppelin.spark.concurrentSQL", "false");
-    p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
-    p.setProperty("zeppelin.spark.useNew", "true");
+    p.setProperty("zeppelin.spark.sql.stacktrace", "true");
+   p.setProperty("zeppelin.spark.useNew", "true");
+    p.setProperty("zeppelin.spark.useHiveContext", "true");
+
     intpGroup = new InterpreterGroup();
     sparkInterpreter = new SparkInterpreter(p);
     sparkInterpreter.setInterpreterGroup(intpGroup);
@@ -60,10 +62,10 @@ public class NewSparkSqlInterpreterTest {
     intpGroup.get("session_1").add(sqlInterpreter);
 
     context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
-        new HashMap<String, Object>(), new GUI(), new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+            new HashMap<String, Object>(), new GUI(), new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("id"),
+            new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
 
     InterpreterContext.set(context);
 
@@ -109,8 +111,8 @@ public class NewSparkSqlInterpreterTest {
     sparkInterpreter.interpret("case class Person(name:String, age:Int)", context);
     sparkInterpreter.interpret("case class People(group:String, person:Person)", context);
     sparkInterpreter.interpret(
-        "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
-        context);
+            "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
+            context);
     if (isDataFrameSupported()) {
       sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")", context);
     } else {
@@ -125,33 +127,33 @@ public class NewSparkSqlInterpreterTest {
     sparkInterpreter.interpret("import org.apache.spark.sql._", context);
     if (isDataFrameSupported()) {
       sparkInterpreter.interpret(
-          "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
-          context);
+              "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
+              context);
     }
     sparkInterpreter.interpret(
-        "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
-        context);
+            "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
+            context);
     sparkInterpreter.interpret(
-        "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
-        context);
+            "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
+            context);
     sparkInterpreter.interpret(
-        "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
-        context);
+            "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+            context);
     sparkInterpreter.interpret(
-        "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
-        context);
+            "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+            context);
     if (isDataFrameSupported()) {
       sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)",
-          context);
+              context);
       sparkInterpreter.interpret("people.toDF.registerTempTable(\"people\")", context);
     } else {
       sparkInterpreter.interpret("val people = sqlContext.applySchema(raw, schema)",
-          context);
+              context);
       sparkInterpreter.interpret("people.registerTempTable(\"people\")", context);
     }
 
     InterpreterResult ret = sqlInterpreter.interpret(
-        "select name, age from people where name = 'gates'", context);
+            "select name, age from people where name = 'gates'", context);
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
     assertEquals(Type.TABLE, ret.message().get(0).getType());
     assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
@@ -161,8 +163,8 @@ public class NewSparkSqlInterpreterTest {
   public void testMaxResults() throws InterpreterException {
     sparkInterpreter.interpret("case class P(age:Int)", context);
     sparkInterpreter.interpret(
-        "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
-        context);
+            "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
+            context);
     if (isDataFrameSupported()) {
       sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")", context);
     } else {
@@ -173,4 +175,37 @@ public class NewSparkSqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
     assertTrue(ret.message().get(1).getData().contains("alert-warning"));
   }
-}
+
+  @Test
+  public void testDDL() throws InterpreterException {
+    InterpreterResult ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // spark 1.x will still return DataFrame with non-empty columns.
+    // org.apache.spark.sql.DataFrame = [result: string]
+    if (!sparkInterpreter.getSparkContext().version().startsWith("1.")) {
+      assertTrue(ret.message().isEmpty());
+    } else {
+      assertEquals(Type.TABLE, ret.message().get(0).getType());
+      assertEquals("result\n", ret.message().get(0).getData());
+    }
+
+    // create the same table again
+    ret = sqlInterpreter.interpret("create table t1(id int, name string)", context);
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(Type.TEXT, ret.message().get(0).getType());
+    assertTrue(ret.message().get(0).getData().contains("already exists"));
+
+    // invalid DDL
+    ret = sqlInterpreter.interpret("create temporary function udf1 as 'org.apache.zeppelin.UDF'", context);
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(Type.TEXT, ret.message().get(0).getType());
+
+    // spark 1.x could not detect the root cause correctly
+    if (!sparkInterpreter.getSparkContext().version().startsWith("1.")) {
+      assertTrue(ret.message().get(0).getData().contains("ClassNotFoundException") ||
+              ret.message().get(0).getData().contains("Can not load class"));
+    }
+  }
+}
\ No newline at end of file
diff --git a/spark/interpreter/src/test/resources/hive-site.xml b/spark/interpreter/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..f26ffcd
--- /dev/null
+++ b/spark/interpreter/src/test/resources/hive-site.xml
@@ -0,0 +1,7 @@
+<configuration>
+    <property>
+        <name>hive.metastore.warehouse.dir</name>
+        <value>${user.home}/hive/warehouse</value>
+        <description>location of default database for the warehouse</description>
+    </property>
+</configuration>
\ No newline at end of file