You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2017/04/02 18:53:53 UTC
zeppelin git commit: [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)…
Repository: zeppelin
Updated Branches:
refs/heads/master 9e8d7eb90 -> 1135fb61d
[ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)\u2026
\u2026 to display results
### What is this PR for?
Livy SQL interpreter truncate result strings of size greater than 20. In some cases, we like to see the full string. We are adding a interpreter property **zeppelin.livy.spark.sql.field.truncate** to control whether to truncate strings or not. By default, **zeppelin.livy.spark.sql.field.truncate** is set to **true**.
### What type of PR is it?
Improvement
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1965
### How should this be tested?
Set zeppelin.livy.spark.sql.field.truncate to true or false
Run a SQL query which produces string values of length greater than 20.
Depending on the value of zeppelin.livy.spark.sql.field.truncate, the strings will either get truncated or not.
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Benoy Antony <be...@apache.org>
Closes #2201 from benoyantony/master and squashes the following commits:
bb006c0 [Benoy Antony] changed field name and description
9eae68b [Benoy Antony] added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases
ab1ead2 [Benoy Antony] documented zeppelin.livy.spark.sql.truncate
b6252be [Benoy Antony] [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false) to display results
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1135fb61
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1135fb61
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1135fb61
Branch: refs/heads/master
Commit: 1135fb61d24e4a7baa82d1442e683eecb5de33d2
Parents: 9e8d7eb
Author: Benoy Antony <be...@apache.org>
Authored: Fri Mar 31 01:04:08 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Sun Apr 2 11:53:49 2017 -0700
----------------------------------------------------------------------
docs/interpreter/livy.md | 7 +-
.../zeppelin/livy/BaseLivyInterprereter.java | 4 +-
.../zeppelin/livy/LivySparkSQLInterpreter.java | 19 ++-
.../src/main/resources/interpreter-setting.json | 5 +
.../apache/zeppelin/livy/LivyInterpreterIT.java | 127 ++++++++++++++++++-
5 files changed, 155 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/docs/interpreter/livy.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md
index ce1c34a..a7b776c 100644
--- a/docs/interpreter/livy.md
+++ b/docs/interpreter/livy.md
@@ -56,11 +56,16 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
<td>URL where livy server is running</td>
</tr>
<tr>
- <td>zeppelin.livy.spark.maxResult</td>
+ <td>zeppelin.livy.spark.sql.maxResult</td>
<td>1000</td>
<td>Max number of Spark SQL result to display.</td>
</tr>
<tr>
+ <td>zeppelin.livy.spark.sql.field.truncate</td>
+ <td>true</td>
+ <td>Whether to truncate field values longer than 20 characters or not</td>
+ </tr>
+ <tr>
<td>zeppelin.livy.session.create_timeout</td>
<td>120</td>
<td>Timeout in seconds for session creation</td>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index d29d20c..26342a2 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -272,7 +272,9 @@ public abstract class BaseLivyInterprereter extends Interpreter {
throw new LivyException(e);
}
stmtInfo = getStatementInfo(stmtInfo.id);
- paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
+ if (paragraphId != null) {
+ paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
+ }
}
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 48e4967..2eaf79c 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -32,14 +32,25 @@ import java.util.Properties;
*/
public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
+ public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE =
+ "zeppelin.livy.spark.sql.field.truncate";
+
+ public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT =
+ "zeppelin.livy.spark.sql.maxResult";
+
private LivySparkInterpreter sparkInterpreter;
private boolean isSpark2 = false;
private int maxResult = 1000;
+ private boolean truncate = true;
public LivySparkSQLInterpreter(Properties property) {
super(property);
- this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
+ this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT));
+ if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) {
+ this.truncate =
+ Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE));
+ }
}
@Override
@@ -111,9 +122,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
// use triple quote so that we don't need to do string escape.
String sqlQuery = null;
if (isSpark2) {
- sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
+ sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
+ truncate + ")";
} else {
- sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
+ sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
+ truncate + ")";
}
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
this.displayAppInfo, true);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json
index 42f64cf..8d3dea0 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -118,6 +118,11 @@
"defaultValue": "1000",
"description": "Max number of Spark SQL result to display."
},
+ "zeppelin.livy.spark.sql.field.truncate": {
+ "propertyName": "zeppelin.livy.spark.sql.field.truncate",
+ "defaultValue": "true",
+ "description": "If true, truncate field values longer than 20 characters."
+ },
"zeppelin.livy.concurrentSQL": {
"propertyName": "zeppelin.livy.concurrentSQL",
"defaultValue": "false",
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 1a8e8df..3da908c 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.livy;
import com.cloudera.livy.test.framework.Cluster;
import com.cloudera.livy.test.framework.Cluster$;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
@@ -33,7 +32,6 @@ import java.util.ArrayList;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class LivyInterpreterIT {
@@ -309,6 +307,131 @@ public class LivyInterpreterIT {
}
@Test
+ public void testStringWithTruncation() {
+ if (!checkPreCondition()) {
+ return;
+ }
+ InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+ interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+ LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+ sparkInterpreter.setInterpreterGroup(interpreterGroup);
+ interpreterGroup.get("session_1").add(sparkInterpreter);
+ AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+ MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+ InterpreterOutput output = new InterpreterOutput(outputListener);
+ InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+ "title", "text", authInfo, null, null, null, null, null, output);
+ sparkInterpreter.open();
+
+ LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
+ interpreterGroup.get("session_1").add(sqlInterpreter);
+ sqlInterpreter.setInterpreterGroup(interpreterGroup);
+ sqlInterpreter.open();
+
+ try {
+ // detect spark version
+ InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+ // test DataFrame api
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+ result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
+ } finally {
+ sparkInterpreter.close();
+ sqlInterpreter.close();
+ }
+ }
+
+ @Test
+ public void testStringWithoutTruncation() {
+ if (!checkPreCondition()) {
+ return;
+ }
+ InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+ interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+ Properties newProps = new Properties();
+ for (Object name: properties.keySet()) {
+ newProps.put(name, properties.get(name));
+ }
+ newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
+ LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
+ sparkInterpreter.setInterpreterGroup(interpreterGroup);
+ interpreterGroup.get("session_1").add(sparkInterpreter);
+ AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+ MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+ InterpreterOutput output = new InterpreterOutput(outputListener);
+ InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+ "title", "text", authInfo, null, null, null, null, null, output);
+ sparkInterpreter.open();
+
+ LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
+ interpreterGroup.get("session_1").add(sqlInterpreter);
+ sqlInterpreter.setInterpreterGroup(interpreterGroup);
+ sqlInterpreter.open();
+
+ try {
+ // detect spark version
+ InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+ // test DataFrame api
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+ result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
+ } finally {
+ sparkInterpreter.close();
+ sqlInterpreter.close();
+ }
+ }
+
+ @Test
public void testPySparkInterpreter() {
if (!checkPreCondition()) {
return;