You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2017/04/02 18:53:53 UTC

zeppelin git commit: [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)…

Repository: zeppelin
Updated Branches:
  refs/heads/master 9e8d7eb90 -> 1135fb61d


[ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)\u2026

\u2026 to display results

### What is this PR for?
Livy SQL interpreter truncate result strings of size greater than 20. In some cases, we like to see the full string. We are adding a interpreter property **zeppelin.livy.spark.sql.field.truncate** to control whether to truncate strings or not. By default, **zeppelin.livy.spark.sql.field.truncate** is set to **true**.

### What type of PR is it?
Improvement

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1965

### How should this be tested?
Set zeppelin.livy.spark.sql.field.truncate to true or false
Run a SQL query which produces string values of length greater than 20.
Depending on the value of zeppelin.livy.spark.sql.field.truncate, the strings will either get truncated or not.

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Benoy Antony <be...@apache.org>

Closes #2201 from benoyantony/master and squashes the following commits:

bb006c0 [Benoy Antony] changed field name and description
9eae68b [Benoy Antony] added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases
ab1ead2 [Benoy Antony] documented zeppelin.livy.spark.sql.truncate
b6252be [Benoy Antony] [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false) to display results


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1135fb61
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1135fb61
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1135fb61

Branch: refs/heads/master
Commit: 1135fb61d24e4a7baa82d1442e683eecb5de33d2
Parents: 9e8d7eb
Author: Benoy Antony <be...@apache.org>
Authored: Fri Mar 31 01:04:08 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Sun Apr 2 11:53:49 2017 -0700

----------------------------------------------------------------------
 docs/interpreter/livy.md                        |   7 +-
 .../zeppelin/livy/BaseLivyInterprereter.java    |   4 +-
 .../zeppelin/livy/LivySparkSQLInterpreter.java  |  19 ++-
 .../src/main/resources/interpreter-setting.json |   5 +
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 127 ++++++++++++++++++-
 5 files changed, 155 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/docs/interpreter/livy.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md
index ce1c34a..a7b776c 100644
--- a/docs/interpreter/livy.md
+++ b/docs/interpreter/livy.md
@@ -56,11 +56,16 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
     <td>URL where livy server is running</td>
   </tr>
   <tr>
-    <td>zeppelin.livy.spark.maxResult</td>
+    <td>zeppelin.livy.spark.sql.maxResult</td>
     <td>1000</td>
     <td>Max number of Spark SQL result to display.</td>
   </tr>
   <tr>
+    <td>zeppelin.livy.spark.sql.field.truncate</td>
+    <td>true</td>
+    <td>Whether to truncate field values longer than 20 characters or not</td>
+  </tr>
+  <tr>
     <td>zeppelin.livy.session.create_timeout</td>
     <td>120</td>
     <td>Timeout in seconds for session creation</td>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index d29d20c..26342a2 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -272,7 +272,9 @@ public abstract class BaseLivyInterprereter extends Interpreter {
           throw new LivyException(e);
         }
         stmtInfo = getStatementInfo(stmtInfo.id);
-        paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
+        if (paragraphId != null) {
+          paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
+        }
       }
       if (appendSessionExpired) {
         return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 48e4967..2eaf79c 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -32,14 +32,25 @@ import java.util.Properties;
  */
 public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
 
+  public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE =
+      "zeppelin.livy.spark.sql.field.truncate";
+
+  public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT =
+      "zeppelin.livy.spark.sql.maxResult";
+
   private LivySparkInterpreter sparkInterpreter;
 
   private boolean isSpark2 = false;
   private int maxResult = 1000;
+  private boolean truncate = true;
 
   public LivySparkSQLInterpreter(Properties property) {
     super(property);
-    this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
+    this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT));
+    if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) {
+      this.truncate =
+          Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE));
+    }
   }
 
   @Override
@@ -111,9 +122,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
       // use triple quote so that we don't need to do string escape.
       String sqlQuery = null;
       if (isSpark2) {
-        sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
+        sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
+            truncate + ")";
       } else {
-        sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
+        sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
+            truncate + ")";
       }
       InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
           this.displayAppInfo, true);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json
index 42f64cf..8d3dea0 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -118,6 +118,11 @@
         "defaultValue": "1000",
         "description": "Max number of Spark SQL result to display."
       },
+      "zeppelin.livy.spark.sql.field.truncate": {
+        "propertyName": "zeppelin.livy.spark.sql.field.truncate",
+        "defaultValue": "true",
+        "description": "If true, truncate field values longer than 20 characters."
+      },
       "zeppelin.livy.concurrentSQL": {
         "propertyName": "zeppelin.livy.concurrentSQL",
         "defaultValue": "false",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 1a8e8df..3da908c 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.livy;
 
 import com.cloudera.livy.test.framework.Cluster;
 import com.cloudera.livy.test.framework.Cluster$;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -33,7 +32,6 @@ import java.util.ArrayList;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class LivyInterpreterIT {
@@ -309,6 +307,131 @@ public class LivyInterpreterIT {
   }
 
   @Test
+  public void testStringWithTruncation() {
+    if (!checkPreCondition()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+    interpreterGroup.get("session_1").add(sparkInterpreter);
+    AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+    MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+    InterpreterOutput output = new InterpreterOutput(outputListener);
+    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+        "title", "text", authInfo, null, null, null, null, null, output);
+    sparkInterpreter.open();
+
+    LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+    sqlInterpreter.open();
+
+    try {
+      // detect spark version
+      InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, result.message().size());
+
+      boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+      // test DataFrame api
+      if (!isSpark2) {
+        result = sparkInterpreter.interpret(
+            "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+      } else {
+        result = sparkInterpreter.interpret(
+            "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+      }
+      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+      // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+      result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+      assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
+    } finally {
+      sparkInterpreter.close();
+      sqlInterpreter.close();
+    }
+  }
+
+  @Test
+  public void testStringWithoutTruncation() {
+    if (!checkPreCondition()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+    Properties newProps = new Properties();
+    for (Object name: properties.keySet()) {
+      newProps.put(name, properties.get(name));
+    }
+    newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
+    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+    interpreterGroup.get("session_1").add(sparkInterpreter);
+    AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+    MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+    InterpreterOutput output = new InterpreterOutput(outputListener);
+    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+        "title", "text", authInfo, null, null, null, null, null, output);
+    sparkInterpreter.open();
+
+    LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+    sqlInterpreter.open();
+
+    try {
+      // detect spark version
+      InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, result.message().size());
+
+      boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+      // test DataFrame api
+      if (!isSpark2) {
+        result = sparkInterpreter.interpret(
+            "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+      } else {
+        result = sparkInterpreter.interpret(
+            "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+      }
+      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+      // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+      result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+      assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
+    } finally {
+      sparkInterpreter.close();
+      sqlInterpreter.close();
+    }
+  }
+
+  @Test
   public void testPySparkInterpreter() {
     if (!checkPreCondition()) {
       return;