You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/02/11 10:02:07 UTC

[zeppelin] branch master updated: ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new c2bb46b  ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
c2bb46b is described below

commit c2bb46b62797e92c0e3108047824dca9b0a02644
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jan 29 11:09:04 2019 +0800

    ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
    
    ### What is this PR for?
    This PR add local property `limit` for SparkInterpreter and JdbcInterpreter, so that users can not control the max number of returned rows in paragraph level instead of interpreter level.
    
    ### What type of PR is it?
    [ Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-3975
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3301 from zjffdu/ZEPPELIN-3975 and squashes the following commits:
    
    6350433cc [Jeff Zhang] ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
---
 .../apache/zeppelin/flink/FlinkZeppelinContext.scala   |  2 +-
 .../apache/zeppelin/groovy/GroovyZeppelinContext.java  |  2 +-
 .../org/apache/zeppelin/helium/DevZeppelinContext.java |  2 +-
 .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java |  4 ++--
 .../org/apache/zeppelin/jdbc/JDBCInterpreterTest.java  |  7 +++++++
 .../apache/zeppelin/python/PythonZeppelinContext.java  |  2 +-
 .../org/apache/zeppelin/spark/SparkSqlInterpreter.java |  4 +++-
 .../apache/zeppelin/spark/SparkZeppelinContext.scala   |  2 +-
 .../zeppelin/spark/NewSparkSqlInterpreterTest.java     | 10 ++++++++++
 .../scala/org/apache/zeppelin/spark/Spark1Shims.java   |  8 ++++++--
 .../scala/org/apache/zeppelin/spark/Spark2Shims.java   |  8 ++++++--
 .../zeppelin/interpreter/BaseZeppelinContext.java      |  7 ++++++-
 .../zeppelin/interpreter/InterpreterContext.java       | 18 +++++++++++++++++-
 .../zeppelin/interpreter/BaseZeppelinContextTest.java  |  2 +-
 14 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 5246445..4102adf 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -54,7 +54,7 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
   override def getInterpreterClassMap: util.Map[String, String] =
     JavaConversions.mapAsJavaMap(interpreterClassMap)
 
-  override def showData(obj: Any): String = {
+  override def showData(obj: Any, maxResult: Int): String = {
     def showTable(table: Table): String = {
       val columnNames: Array[String] = table.getSchema.getColumnNames
       val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
index 3f0d600..8d0761f 100644
--- a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
+++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java
@@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext {
   }
 
   @Override
-  public String showData(Object obj) {
+  public String showData(Object obj, int maxResult) {
     return null;
   }
 }
diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
index 45d8b39..d3bf08f 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java
@@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext {
   }
 
   @Override
-  public String showData(Object obj) {
+  public String showData(Object obj, int maxResult) {
     return null;
   }
 }
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index a547171..d2ae404 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -721,8 +721,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
         statement = connection.createStatement();
 
         // fetch n+1 rows in order to indicate there's more rows available (for large selects)
-        statement.setFetchSize(getMaxResult());
-        statement.setMaxRows(maxRows);
+        statement.setFetchSize(interpreterContext.getIntLocalProperty("limit", getMaxResult()));
+        statement.setMaxRows(interpreterContext.getIntLocalProperty("limit", maxRows));
 
         if (statement == null) {
           return new InterpreterResult(Code.ERROR, "Prefix not found.");
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index 995c530..c7d417d 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -188,6 +188,13 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
+
+    interpreterContext.getLocalProperties().put("limit", "1");
+    interpreterResult = t.interpret(sqlQuery, interpreterContext);
+
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
   }
 
   @Test
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
index 855ddf7..32d717e 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java
@@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext {
   }
 
   @Override
-  public String showData(Object obj) {
+  public String showData(Object obj, int maxResult) {
     return null;
   }
 }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 7843435..b90b6f4 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -86,8 +86,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
 
     try {
       Method method = sqlc.getClass().getMethod("sql", String.class);
+      int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
+              "" + sparkInterpreter.getZeppelinContext().getMaxResult()));
       String msg = sparkInterpreter.getZeppelinContext().showData(
-          method.invoke(sqlc, st));
+          method.invoke(sqlc, st), maxResult);
       sc.clearJobGroup();
       return new InterpreterResult(Code.SUCCESS, msg);
     } catch (Exception e) {
diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index 0e10d84..e80c152 100644
--- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -67,7 +67,7 @@ class SparkZeppelinContext(val sc: SparkContext,
   override def getInterpreterClassMap: util.Map[String, String] =
     JavaConversions.mapAsJavaMap(interpreterClassMap)
 
-  override def showData(obj: Any): String = sparkShims.showDataFrame(obj, maxResult)
+  override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult)
 
   @ZeppelinApi
   def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index ed91ffe..d16a0e7 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -114,6 +114,7 @@ public class NewSparkSqlInterpreterTest {
 
     InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+
   }
 
   public void test_null_value_in_row() throws InterpreterException {
@@ -155,7 +156,16 @@ public class NewSparkSqlInterpreterTest {
 
     InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // the number of rows is 10+1, 1 is the head of table
+    assertEquals(11, ret.message().get(0).getData().split("\n").length);
     assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+
+    // test limit local property
+    context.getLocalProperties().put("limit", "5");
+    ret = sqlInterpreter.interpret("select * from gr", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // the number of rows is 5+1, 1 is the head of table
+    assertEquals(6, ret.message().get(0).getData().split("\n").length);
   }
 
   @Test
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index c13ff9a..1b05daf 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -56,12 +56,16 @@ public class Spark1Shims extends SparkShims {
     if (obj instanceof DataFrame) {
       DataFrame df = (DataFrame) obj;
       String[] columns = df.columns();
+      // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
       List<Row> rows = df.takeAsList(maxResult + 1);
-
       StringBuilder msg = new StringBuilder();
       msg.append("%table ");
       msg.append(StringUtils.join(columns, "\t"));
       msg.append("\n");
+      boolean isLargerThanMaxResult = rows.size() > maxResult;
+      if (isLargerThanMaxResult) {
+        rows = rows.subList(0, maxResult);
+      }
       for (Row row : rows) {
         for (int i = 0; i < row.size(); ++i) {
           msg.append(row.get(i));
@@ -72,7 +76,7 @@ public class Spark1Shims extends SparkShims {
         msg.append("\n");
       }
 
-      if (rows.size() > maxResult) {
+      if (isLargerThanMaxResult) {
         msg.append("\n");
         msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
       }
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 5043786..fc5062e 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -57,12 +57,16 @@ public class Spark2Shims extends SparkShims {
     if (obj instanceof Dataset) {
       Dataset<Row> df = ((Dataset) obj).toDF();
       String[] columns = df.columns();
+      // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
       List<Row> rows = df.takeAsList(maxResult + 1);
-
       StringBuilder msg = new StringBuilder();
       msg.append("%table ");
       msg.append(StringUtils.join(columns, "\t"));
       msg.append("\n");
+      boolean isLargerThanMaxResult = rows.size() > maxResult;
+      if (isLargerThanMaxResult) {
+        rows = rows.subList(0, maxResult);
+      }
       for (Row row : rows) {
         for (int i = 0; i < row.size(); ++i) {
           msg.append(row.get(i));
@@ -73,7 +77,7 @@ public class Spark2Shims extends SparkShims {
         msg.append("\n");
       }
 
-      if (rows.size() > maxResult) {
+      if (isLargerThanMaxResult) {
         msg.append("\n");
         msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
       }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 3140a4c..abf2e0a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -65,13 +65,18 @@ public abstract class BaseZeppelinContext {
     return this.maxResult;
   }
 
+  public String showData(Object obj) {
+    return showData(obj, maxResult);
+  }
+
   /**
    * subclasses should implement this method to display specific data type
    *
    * @param obj
+   * @param maxResult  max number of rows to display
    * @return
    */
-  public abstract String showData(Object obj);
+  public abstract String showData(Object obj, int maxResult);
 
   /**
    * @deprecated use z.textbox instead
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 23ac789..4e0a8df 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -201,6 +201,22 @@ public class InterpreterContext {
     return localProperties;
   }
 
+  public String getStringLocalProperty(String key, String defaultValue) {
+    return localProperties.getOrDefault(key, defaultValue);
+  }
+
+  public int getIntLocalProperty(String key, int defaultValue) {
+    return Integer.parseInt(localProperties.getOrDefault(key, defaultValue + ""));
+  }
+
+  public long getLongLocalProperty(String key, int defaultValue) {
+    return Long.parseLong(localProperties.getOrDefault(key, defaultValue + ""));
+  }
+
+  public double getDoubleLocalProperty(String key, double defaultValue) {
+    return Double.parseDouble(localProperties.getOrDefault(key, defaultValue + ""));
+  }
+
   public AuthenticationInfo getAuthenticationInfo() {
     return authenticationInfo;
   }
@@ -228,7 +244,7 @@ public class InterpreterContext {
   public String getInterpreterClassName() {
     return interpreterClassName;
   }
-  
+
   public void setInterpreterClassName(String className) {
     this.interpreterClassName = className;
   }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
index d8323bb..8d2798f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java
@@ -133,7 +133,7 @@ public class BaseZeppelinContextTest {
     }
 
     @Override
-    public String showData(Object obj) {
+    public String showData(Object obj, int maxResult) {
       return null;
     }
   }