You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/02/11 10:02:07 UTC
[zeppelin] branch master updated: ZEPPELIN-3975. Add limit local
property for SparkInterpreter and JdbcInterpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new c2bb46b ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
c2bb46b is described below
commit c2bb46b62797e92c0e3108047824dca9b0a02644
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jan 29 11:09:04 2019 +0800
ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
### What is this PR for?
This PR add local property `limit` for SparkInterpreter and JdbcInterpreter, so that users can not control the max number of returned rows in paragraph level instead of interpreter level.
### What type of PR is it?
[ Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://jira.apache.org/jira/browse/ZEPPELIN-3975
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3301 from zjffdu/ZEPPELIN-3975 and squashes the following commits:
6350433cc [Jeff Zhang] ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
---
.../apache/zeppelin/flink/FlinkZeppelinContext.scala | 2 +-
.../apache/zeppelin/groovy/GroovyZeppelinContext.java | 2 +-
.../org/apache/zeppelin/helium/DevZeppelinContext.java | 2 +-
.../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 ++--
.../org/apache/zeppelin/jdbc/JDBCInterpreterTest.java | 7 +++++++
.../apache/zeppelin/python/PythonZeppelinContext.java | 2 +-
.../org/apache/zeppelin/spark/SparkSqlInterpreter.java | 4 +++-
.../apache/zeppelin/spark/SparkZeppelinContext.scala | 2 +-
.../zeppelin/spark/NewSparkSqlInterpreterTest.java | 10 ++++++++++
.../scala/org/apache/zeppelin/spark/Spark1Shims.java | 8 ++++++--
.../scala/org/apache/zeppelin/spark/Spark2Shims.java | 8 ++++++--
.../zeppelin/interpreter/BaseZeppelinContext.java | 7 ++++++-
.../zeppelin/interpreter/InterpreterContext.java | 18 +++++++++++++++++-
.../zeppelin/interpreter/BaseZeppelinContextTest.java | 2 +-
14 files changed, 63 insertions(+), 15 deletions(-)
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 5246445..4102adf 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -54,7 +54,7 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
override def getInterpreterClassMap: util.Map[String, String] =
JavaConversions.mapAsJavaMap(interpreterClassMap)
- override def showData(obj: Any): String = {
+ override def showData(obj: Any, maxResult: Int): String = {
def showTable(table: Table): String = {
val columnNames: Array[String] = table.getSchema.getColumnNames
val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
index 3f0d600..8d0761f 100644
--- a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
+++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
@@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext {
}
@Override
- public String showData(Object obj) {
+ public String showData(Object obj, int maxResult) {
return null;
}
}
diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
index 45d8b39..d3bf08f 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
@@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext {
}
@Override
- public String showData(Object obj) {
+ public String showData(Object obj, int maxResult) {
return null;
}
}
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index a547171..d2ae404 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -721,8 +721,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
statement = connection.createStatement();
// fetch n+1 rows in order to indicate there's more rows available (for large selects)
- statement.setFetchSize(getMaxResult());
- statement.setMaxRows(maxRows);
+ statement.setFetchSize(interpreterContext.getIntLocalProperty("limit", getMaxResult()));
+ statement.setMaxRows(interpreterContext.getIntLocalProperty("limit", maxRows));
if (statement == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index 995c530..c7d417d 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -188,6 +188,13 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
+
+ interpreterContext.getLocalProperties().put("limit", "1");
+ interpreterResult = t.interpret(sqlQuery, interpreterContext);
+
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+ assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
}
@Test
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
index 855ddf7..32d717e 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
@@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext {
}
@Override
- public String showData(Object obj) {
+ public String showData(Object obj, int maxResult) {
return null;
}
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 7843435..b90b6f4 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -86,8 +86,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
try {
Method method = sqlc.getClass().getMethod("sql", String.class);
+ int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
+ "" + sparkInterpreter.getZeppelinContext().getMaxResult()));
String msg = sparkInterpreter.getZeppelinContext().showData(
- method.invoke(sqlc, st));
+ method.invoke(sqlc, st), maxResult);
sc.clearJobGroup();
return new InterpreterResult(Code.SUCCESS, msg);
} catch (Exception e) {
diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index 0e10d84..e80c152 100644
--- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -67,7 +67,7 @@ class SparkZeppelinContext(val sc: SparkContext,
override def getInterpreterClassMap: util.Map[String, String] =
JavaConversions.mapAsJavaMap(interpreterClassMap)
- override def showData(obj: Any): String = sparkShims.showDataFrame(obj, maxResult)
+ override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult)
@ZeppelinApi
def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index ed91ffe..d16a0e7 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -114,6 +114,7 @@ public class NewSparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+
}
public void test_null_value_in_row() throws InterpreterException {
@@ -155,7 +156,16 @@ public class NewSparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ // the number of rows is 10+1, 1 is the head of table
+ assertEquals(11, ret.message().get(0).getData().split("\n").length);
assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+
+ // test limit local property
+ context.getLocalProperties().put("limit", "5");
+ ret = sqlInterpreter.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ // the number of rows is 5+1, 1 is the head of table
+ assertEquals(6, ret.message().get(0).getData().split("\n").length);
}
@Test
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index c13ff9a..1b05daf 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -56,12 +56,16 @@ public class Spark1Shims extends SparkShims {
if (obj instanceof DataFrame) {
DataFrame df = (DataFrame) obj;
String[] columns = df.columns();
+ // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
List<Row> rows = df.takeAsList(maxResult + 1);
-
StringBuilder msg = new StringBuilder();
msg.append("%table ");
msg.append(StringUtils.join(columns, "\t"));
msg.append("\n");
+ boolean isLargerThanMaxResult = rows.size() > maxResult;
+ if (isLargerThanMaxResult) {
+ rows = rows.subList(0, maxResult);
+ }
for (Row row : rows) {
for (int i = 0; i < row.size(); ++i) {
msg.append(row.get(i));
@@ -72,7 +76,7 @@ public class Spark1Shims extends SparkShims {
msg.append("\n");
}
- if (rows.size() > maxResult) {
+ if (isLargerThanMaxResult) {
msg.append("\n");
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
}
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 5043786..fc5062e 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -57,12 +57,16 @@ public class Spark2Shims extends SparkShims {
if (obj instanceof Dataset) {
Dataset<Row> df = ((Dataset) obj).toDF();
String[] columns = df.columns();
+ // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
List<Row> rows = df.takeAsList(maxResult + 1);
-
StringBuilder msg = new StringBuilder();
msg.append("%table ");
msg.append(StringUtils.join(columns, "\t"));
msg.append("\n");
+ boolean isLargerThanMaxResult = rows.size() > maxResult;
+ if (isLargerThanMaxResult) {
+ rows = rows.subList(0, maxResult);
+ }
for (Row row : rows) {
for (int i = 0; i < row.size(); ++i) {
msg.append(row.get(i));
@@ -73,7 +77,7 @@ public class Spark2Shims extends SparkShims {
msg.append("\n");
}
- if (rows.size() > maxResult) {
+ if (isLargerThanMaxResult) {
msg.append("\n");
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 3140a4c..abf2e0a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -65,13 +65,18 @@ public abstract class BaseZeppelinContext {
return this.maxResult;
}
+ public String showData(Object obj) {
+ return showData(obj, maxResult);
+ }
+
/**
* subclasses should implement this method to display specific data type
*
* @param obj
+ * @param maxResult max number of rows to display
* @return
*/
- public abstract String showData(Object obj);
+ public abstract String showData(Object obj, int maxResult);
/**
* @deprecated use z.textbox instead
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 23ac789..4e0a8df 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -201,6 +201,22 @@ public class InterpreterContext {
return localProperties;
}
+ public String getStringLocalProperty(String key, String defaultValue) {
+ return localProperties.getOrDefault(key, defaultValue);
+ }
+
+ public int getIntLocalProperty(String key, int defaultValue) {
+ return Integer.parseInt(localProperties.getOrDefault(key, defaultValue + ""));
+ }
+
+ public long getLongLocalProperty(String key, int defaultValue) {
+ return Long.parseLong(localProperties.getOrDefault(key, defaultValue + ""));
+ }
+
+ public double getDoubleLocalProperty(String key, double defaultValue) {
+ return Double.parseDouble(localProperties.getOrDefault(key, defaultValue + ""));
+ }
+
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
@@ -228,7 +244,7 @@ public class InterpreterContext {
public String getInterpreterClassName() {
return interpreterClassName;
}
-
+
public void setInterpreterClassName(String className) {
this.interpreterClassName = className;
}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
index d8323bb..8d2798f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
@@ -133,7 +133,7 @@ public class BaseZeppelinContextTest {
}
@Override
- public String showData(Object obj) {
+ public String showData(Object obj, int maxResult) {
return null;
}
}