You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/05/08 06:08:31 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4800]. Support z.show(table) in flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 609c1dc  [ZEPPELIN-4800]. Support z.show(table) in flink interpreter
609c1dc is described below

commit 609c1dccb762ba655115d64d4456bda8bc5c75b0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue May 5 23:34:43 2020 +0800

    [ZEPPELIN-4800]. Support z.show(table) in flink interpreter
    
    ### What is this PR for?
    
    This PR is to support `z.show(table)` in flink interpreter.  It could not only be used in scala api but also pyflink api. See the screenshot below.
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4800
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/81127715-ab864000-8f71-11ea-9286-89c543e901ce.png)
    
    ![image](https://user-images.githubusercontent.com/164491/81127735-bccf4c80-8f71-11ea-87ec-06c8afcafc21.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3762 from zjffdu/ZEPPELIN-4800 and squashes the following commits:
    
    713b9996e [Jeff Zhang] [ZEPPELIN-4800]. Support z.show(table) in flink interpreter
    
    (cherry picked from commit 013f7d4629ee9d2f34a3694c616722b6172d23a1)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../zeppelin/flink/FlinkZeppelinContext.scala      |   2 +-
 .../flink/FlinkBatchSqlInterpreterTest.java        |  10 +
 .../Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln     | 222 ++++++++++++++++++---
 3 files changed, 203 insertions(+), 31 deletions(-)

diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 3694f9a..146ec63 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -57,7 +57,7 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
     "ipyflink" -> "org.apache.zeppelin.flink.IPyFlinkInterpreter"
   )
 
-  private val supportedClasses = Seq(classOf[DataSet[_]])
+  private val supportedClasses = Seq(classOf[DataSet[_]], classOf[Table])
 
   def setCurrentSql(sql: String): Unit = {
     this.currentSql = sql
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 19dedfa..03efa9b 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -70,6 +70,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
 
+    // z.show
+    context = getInterpreterContext();
+    result =
+            flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+
     // define scala udf
     result = flinkInterpreter.interpret(
             "class AddOne extends ScalarFunction {\n" +
diff --git a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln
index d558a76..40c8cc2 100644
--- a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln	
+++ b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln	
@@ -49,7 +49,7 @@
       "title": "Download bank data",
       "text": "%sh\n\ncd /tmp\nrm -rf bank*\nwget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nunzip bank.zip\n# upload data to hdfs if you want to run it in yarn mode\n# hadoop fs -put /tmp/bank.csv /tmp/bank.csv\n",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:37:36.288",
+      "dateUpdated": "2020-05-05 22:52:51.576",
       "config": {
         "runOnSelectionChange": true,
         "title": true,
@@ -75,7 +75,7 @@
         "msg": [
           {
             "type": "TEXT",
-            "data": "--2020-04-27 11:37:36--  https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n     0K .......... .......... .......... .......... ..........  8% 88.2K 6s\n    50K ... [...]
+            "data": "--2020-05-05 22:52:51--  https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n     0K .......... .......... .......... .......... ..........  8%  105K 5s\n    50K ... [...]
           }
         ]
       },
@@ -84,8 +84,8 @@
       "jobName": "paragraph_1578045094400_1030344935",
       "id": "paragraph_1578045094400_1030344935",
       "dateCreated": "2020-01-03 17:51:34.400",
-      "dateStarted": "2020-04-27 11:37:36.293",
-      "dateFinished": "2020-04-27 11:37:40.794",
+      "dateStarted": "2020-05-05 22:52:51.581",
+      "dateFinished": "2020-05-05 22:52:59.324",
       "status": "FINISHED"
     },
     {
@@ -135,7 +135,7 @@
       "title": "Define source table which represents the raw data",
       "text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank_raw;\nCREATE TABLE bank_raw (\n   content STRING\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027\\n\u0027,\n\u0027connector.type\u0027\u003d\u0027filesystem\u0027,\n\u0027format.derive-schema\u0027\u003d\u0027true\u0027,\n\u0027connector.path\u0027\u003d\u0027/tmp/bank.csv\u0027,\n\u0027format.type\u0027\u003d\u0027csv\u0027\n);",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:39:53.363",
+      "dateUpdated": "2020-05-05 23:01:30.843",
       "config": {
         "colWidth": 6.0,
         "fontSize": 9.0,
@@ -170,15 +170,15 @@
       "jobName": "paragraph_1578044954921_-1188487356",
       "id": "paragraph_1578044954921_-1188487356",
       "dateCreated": "2020-01-03 17:49:14.921",
-      "dateStarted": "2020-04-27 11:38:36.094",
-      "dateFinished": "2020-04-27 11:38:36.393",
+      "dateStarted": "2020-05-05 23:01:30.849",
+      "dateFinished": "2020-05-05 23:01:41.675",
       "status": "FINISHED"
     },
     {
       "title": "Define sink table which represents the cleaned data",
       "text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank;\nCREATE TABLE bank (\n    age int, \n    job string,\n    marital string,\n    education string,\n    `default` string,\n    balance string,\n    housing string,\n    loan string,\n    contact string, \n    `day` string,\n    `month` string,\n    duration int,\n    campaign int,\n    pdays int,\n    previous int,\n    poutcome string,\n    y string\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027,\u0027,\n\u0027connector.t [...]
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:39:50.826",
+      "dateUpdated": "2020-05-05 23:01:32.335",
       "config": {
         "runOnSelectionChange": true,
         "title": true,
@@ -213,15 +213,15 @@
       "jobName": "paragraph_1578045204379_-1427374232",
       "id": "paragraph_1578045204379_-1427374232",
       "dateCreated": "2020-01-03 17:53:24.379",
-      "dateStarted": "2020-04-27 11:38:37.856",
-      "dateFinished": "2020-04-27 11:38:38.118",
+      "dateStarted": "2020-05-05 23:01:41.334",
+      "dateFinished": "2020-05-05 23:01:41.675",
       "status": "FINISHED"
     },
     {
       "title": "Show tables",
       "text": "%flink.bsql\n\nshow tables",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:40:15.544",
+      "dateUpdated": "2020-05-05 22:52:11.086",
       "config": {
         "colWidth": 12.0,
         "fontSize": 9.0,
@@ -282,15 +282,15 @@
       "jobName": "paragraph_1587958743728_1444404682",
       "id": "paragraph_1587958743728_1444404682",
       "dateCreated": "2020-04-27 11:39:03.728",
-      "dateStarted": "2020-04-27 11:39:06.222",
-      "dateFinished": "2020-04-27 11:39:06.500",
+      "dateStarted": "2020-05-05 22:52:11.091",
+      "dateFinished": "2020-05-05 22:52:11.283",
       "status": "FINISHED"
     },
     {
       "title": "Define UDTF ParseFunction to parse the raw data",
       "text": "%flink\n\nimport org.apache.flink.api.java.typeutils.RowTypeInfo\nimport org.apache.flink.api.common.typeinfo.Types\nimport org.apache.flink.api.java.typeutils._\nimport org.apache.flink.api.scala.typeutils._\nimport org.apache.flink.api.scala._\n\nclass Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val [...]
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:38:03.810",
+      "dateUpdated": "2020-05-05 23:01:45.688",
       "config": {
         "runOnSelectionChange": true,
         "title": true,
@@ -325,15 +325,15 @@
       "jobName": "paragraph_1578888628353_1621411444",
       "id": "paragraph_1578888628353_1621411444",
       "dateCreated": "2020-01-13 12:10:28.359",
-      "dateStarted": "2020-04-27 11:38:03.814",
-      "dateFinished": "2020-04-27 11:38:06.223",
+      "dateStarted": "2020-05-05 23:01:45.693",
+      "dateFinished": "2020-05-05 23:01:48.276",
       "status": "FINISHED"
     },
     {
       "title": "Clean data",
       "text": "%sh\n\nrm -rf /tmp/bank_cleaned\n#hadoop fs -rmr /tmp/bank_cleaned",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:38:44.643",
+      "dateUpdated": "2020-05-05 22:52:42.774",
       "config": {
         "runOnSelectionChange": true,
         "title": true,
@@ -363,15 +363,15 @@
       "jobName": "paragraph_1579061020460_-113987164",
       "id": "paragraph_1579061020460_-113987164",
       "dateCreated": "2020-01-15 12:03:40.468",
-      "dateStarted": "2020-04-27 11:38:44.647",
-      "dateFinished": "2020-04-27 11:38:44.672",
+      "dateStarted": "2020-05-05 22:52:42.780",
+      "dateFinished": "2020-05-05 22:52:42.836",
       "status": "FINISHED"
     },
     {
       "title": "Parse the data and write it into sink table",
       "text": "%flink.bsql\n\ninsert into bank select T.* from bank_raw, LATERAL TABLE(parse(content)) as T(age, job,  marital, education, `default`, balance, housing, loan, contact, `day`, `month`, duration, campaign, pdays,  previous,  poutcome, y) ",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:38:47.002",
+      "dateUpdated": "2020-05-05 22:53:04.445",
       "config": {
         "runOnSelectionChange": true,
         "title": true,
@@ -406,15 +406,15 @@
       "jobName": "paragraph_1578669828368_-1923137601",
       "id": "paragraph_1578669828368_-1923137601",
       "dateCreated": "2020-01-10 23:23:48.368",
-      "dateStarted": "2020-04-27 11:38:47.006",
-      "dateFinished": "2020-04-27 11:38:50.488",
+      "dateStarted": "2020-05-05 22:53:04.450",
+      "dateFinished": "2020-05-05 22:53:05.856",
       "status": "FINISHED"
     },
     {
       "title": "Preview output data",
       "text": "%flink.bsql\n\nselect * from bank limit 10\n",
       "user": "anonymous",
-      "dateUpdated": "2020-04-27 11:40:23.766",
+      "dateUpdated": "2020-05-05 23:01:51.464",
       "config": {
         "colWidth": 12.0,
         "fontSize": 9.0,
@@ -493,37 +493,199 @@
       "jobName": "paragraph_1578068480238_-1678045273",
       "id": "paragraph_1578068480238_-1678045273",
       "dateCreated": "2020-01-04 00:21:20.267",
-      "dateStarted": "2020-04-27 11:40:23.770",
-      "dateFinished": "2020-04-27 11:40:24.710",
+      "dateStarted": "2020-05-05 23:01:51.469",
+      "dateFinished": "2020-05-05 23:01:55.009",
       "status": "FINISHED"
     },
     {
-      "text": "%flink.bsql\n",
+      "title": "Display table via z.show",
+      "text": "%flink\n\nval table \u003d btenv.sqlQuery(\"select * from bank limit 10\")\nz.show(table)",
       "user": "anonymous",
-      "dateUpdated": "2020-02-02 16:41:35.121",
+      "dateUpdated": "2020-05-05 23:33:44.788",
       "config": {
         "colWidth": 12.0,
         "fontSize": 9.0,
         "enabled": true,
-        "results": {},
+        "results": {
+          "1": {
+            "graph": {
+              "mode": "table",
+              "height": 300.0,
+              "optionOpen": false,
+              "setting": {
+                "table": {
+                  "tableGridState": {},
+                  "tableColumnTypeState": {
+                    "names": {
+                      "age": "string",
+                      "job": "string",
+                      "marital": "string",
+                      "education": "string",
+                      "default": "string",
+                      "balance": "string",
+                      "housing": "string",
+                      "loan": "string",
+                      "contact": "string",
+                      "day": "string",
+                      "month": "string",
+                      "duration": "string",
+                      "campaign": "string",
+                      "pdays": "string",
+                      "previous": "string",
+                      "poutcome": "string",
+                      "y": "string"
+                    },
+                    "updated": false
+                  },
+                  "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...]
+                  "tableOptionValue": {
+                    "useFilter": false,
+                    "showPagination": false,
+                    "showAggregationFooter": false
+                  },
+                  "updated": false,
+                  "initialized": false
+                }
+              },
+              "commonSetting": {}
+            }
+          }
+        },
         "editorSetting": {
-          "language": "sql",
+          "language": "scala",
           "editOnDblClick": false,
           "completionKey": "TAB",
           "completionSupport": true
         },
-        "editorMode": "ace/mode/sql"
+        "editorMode": "ace/mode/scala",
+        "title": true
       },
       "settings": {
         "params": {},
         "forms": {}
       },
+      "results": {
+        "code": "SUCCESS",
+        "msg": [
+          {
+            "type": "TEXT",
+            "data": "\u001b[1m\u001b[34mtable\u001b[0m: \u001b[1m\u001b[32morg.apache.flink.table.api.Table\u001b[0m \u003d UnnamedTable$3\n"
+          },
+          {
+            "type": "TABLE",
+            "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...]
+          }
+        ]
+      },
       "apps": [],
       "progressUpdateIntervalMs": 500,
       "jobName": "paragraph_1579061037737_-1577558456",
       "id": "paragraph_1579061037737_-1577558456",
       "dateCreated": "2020-01-15 12:03:57.737",
+      "dateStarted": "2020-05-05 23:03:09.350",
+      "dateFinished": "2020-05-05 23:03:11.061",
+      "status": "FINISHED"
+    },
+    {
+      "title": "Display table via z.show in PyFlink",
+      "text": "%flink.pyflink\n\ntable \u003d bt_env.sql_query(\"select * from bank limit 10\")\nz.show(table)",
+      "user": "anonymous",
+      "dateUpdated": "2020-05-05 23:37:48.878",
+      "config": {
+        "colWidth": 12.0,
+        "fontSize": 9.0,
+        "enabled": true,
+        "results": {
+          "0": {
+            "graph": {
+              "mode": "table",
+              "height": 300.0,
+              "optionOpen": false,
+              "setting": {
+                "table": {
+                  "tableGridState": {},
+                  "tableColumnTypeState": {
+                    "names": {
+                      "age": "string",
+                      "job": "string",
+                      "marital": "string",
+                      "education": "string",
+                      "default": "string",
+                      "balance": "string",
+                      "housing": "string",
+                      "loan": "string",
+                      "contact": "string",
+                      "day": "string",
+                      "month": "string",
+                      "duration": "string",
+                      "campaign": "string",
+                      "pdays": "string",
+                      "previous": "string",
+                      "poutcome": "string",
+                      "y": "string"
+                    },
+                    "updated": false
+                  },
+                  "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...]
+                  "tableOptionValue": {
+                    "useFilter": false,
+                    "showPagination": false,
+                    "showAggregationFooter": false
+                  },
+                  "updated": false,
+                  "initialized": false
+                }
+              },
+              "commonSetting": {}
+            }
+          }
+        },
+        "editorSetting": {
+          "language": "python",
+          "editOnDblClick": false,
+          "completionKey": "TAB",
+          "completionSupport": true
+        },
+        "editorMode": "ace/mode/python",
+        "title": true
+      },
+      "settings": {
+        "params": {},
+        "forms": {}
+      },
+      "results": {
+        "code": "SUCCESS",
+        "msg": [
+          {
+            "type": "TABLE",
+            "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...]
+          }
+        ]
+      },
+      "apps": [],
+      "progressUpdateIntervalMs": 500,
+      "jobName": "paragraph_1588690392097_1159956807",
+      "id": "paragraph_1588690392097_1159956807",
+      "dateCreated": "2020-05-05 22:53:12.097",
+      "dateStarted": "2020-05-05 23:37:07.989",
+      "dateFinished": "2020-05-05 23:37:15.984",
       "status": "FINISHED"
+    },
+    {
+      "text": "%flink.pyflink\n",
+      "user": "anonymous",
+      "dateUpdated": "2020-05-05 23:37:07.989",
+      "config": {},
+      "settings": {
+        "params": {},
+        "forms": {}
+      },
+      "apps": [],
+      "progressUpdateIntervalMs": 500,
+      "jobName": "paragraph_1588693027989_1331448600",
+      "id": "paragraph_1588693027989_1331448600",
+      "dateCreated": "2020-05-05 23:37:07.989",
+      "status": "READY"
     }
   ],
   "name": "6. Batch ETL",