You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/05/08 06:08:31 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4800]. Support
z.show(table) in flink interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 609c1dc [ZEPPELIN-4800]. Support z.show(table) in flink interpreter
609c1dc is described below
commit 609c1dccb762ba655115d64d4456bda8bc5c75b0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue May 5 23:34:43 2020 +0800
[ZEPPELIN-4800]. Support z.show(table) in flink interpreter
### What is this PR for?
This PR is to support `z.show(table)` in flink interpreter. It could not only be used in scala api but also pyflink api. See the screenshot below.
### What type of PR is it?
[ Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4800
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/81127715-ab864000-8f71-11ea-9286-89c543e901ce.png)
![image](https://user-images.githubusercontent.com/164491/81127735-bccf4c80-8f71-11ea-87ec-06c8afcafc21.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3762 from zjffdu/ZEPPELIN-4800 and squashes the following commits:
713b9996e [Jeff Zhang] [ZEPPELIN-4800]. Support z.show(table) in flink interpreter
(cherry picked from commit 013f7d4629ee9d2f34a3694c616722b6172d23a1)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../zeppelin/flink/FlinkZeppelinContext.scala | 2 +-
.../flink/FlinkBatchSqlInterpreterTest.java | 10 +
.../Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln | 222 ++++++++++++++++++---
3 files changed, 203 insertions(+), 31 deletions(-)
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 3694f9a..146ec63 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -57,7 +57,7 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
"ipyflink" -> "org.apache.zeppelin.flink.IPyFlinkInterpreter"
)
- private val supportedClasses = Seq(classOf[DataSet[_]])
+ private val supportedClasses = Seq(classOf[DataSet[_]], classOf[Table])
def setCurrentSql(sql: String): Unit = {
this.currentSql = sql
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 19dedfa..03efa9b 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -70,6 +70,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+ // z.show
+ context = getInterpreterContext();
+ result =
+ flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, resultMessages.size());
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+
// define scala udf
result = flinkInterpreter.interpret(
"class AddOne extends ScalarFunction {\n" +
diff --git a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln
index d558a76..40c8cc2 100644
--- a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln
+++ b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln
@@ -49,7 +49,7 @@
"title": "Download bank data",
"text": "%sh\n\ncd /tmp\nrm -rf bank*\nwget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nunzip bank.zip\n# upload data to hdfs if you want to run it in yarn mode\n# hadoop fs -put /tmp/bank.csv /tmp/bank.csv\n",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:37:36.288",
+ "dateUpdated": "2020-05-05 22:52:51.576",
"config": {
"runOnSelectionChange": true,
"title": true,
@@ -75,7 +75,7 @@
"msg": [
{
"type": "TEXT",
- "data": "--2020-04-27 11:37:36-- https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n 0K .......... .......... .......... .......... .......... 8% 88.2K 6s\n 50K ... [...]
+ "data": "--2020-05-05 22:52:51-- https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n 0K .......... .......... .......... .......... .......... 8% 105K 5s\n 50K ... [...]
}
]
},
@@ -84,8 +84,8 @@
"jobName": "paragraph_1578045094400_1030344935",
"id": "paragraph_1578045094400_1030344935",
"dateCreated": "2020-01-03 17:51:34.400",
- "dateStarted": "2020-04-27 11:37:36.293",
- "dateFinished": "2020-04-27 11:37:40.794",
+ "dateStarted": "2020-05-05 22:52:51.581",
+ "dateFinished": "2020-05-05 22:52:59.324",
"status": "FINISHED"
},
{
@@ -135,7 +135,7 @@
"title": "Define source table which represents the raw data",
"text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank_raw;\nCREATE TABLE bank_raw (\n content STRING\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027\\n\u0027,\n\u0027connector.type\u0027\u003d\u0027filesystem\u0027,\n\u0027format.derive-schema\u0027\u003d\u0027true\u0027,\n\u0027connector.path\u0027\u003d\u0027/tmp/bank.csv\u0027,\n\u0027format.type\u0027\u003d\u0027csv\u0027\n);",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:39:53.363",
+ "dateUpdated": "2020-05-05 23:01:30.843",
"config": {
"colWidth": 6.0,
"fontSize": 9.0,
@@ -170,15 +170,15 @@
"jobName": "paragraph_1578044954921_-1188487356",
"id": "paragraph_1578044954921_-1188487356",
"dateCreated": "2020-01-03 17:49:14.921",
- "dateStarted": "2020-04-27 11:38:36.094",
- "dateFinished": "2020-04-27 11:38:36.393",
+ "dateStarted": "2020-05-05 23:01:30.849",
+ "dateFinished": "2020-05-05 23:01:41.675",
"status": "FINISHED"
},
{
"title": "Define sink table which represents the cleaned data",
"text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank;\nCREATE TABLE bank (\n age int, \n job string,\n marital string,\n education string,\n `default` string,\n balance string,\n housing string,\n loan string,\n contact string, \n `day` string,\n `month` string,\n duration int,\n campaign int,\n pdays int,\n previous int,\n poutcome string,\n y string\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027,\u0027,\n\u0027connector.t [...]
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:39:50.826",
+ "dateUpdated": "2020-05-05 23:01:32.335",
"config": {
"runOnSelectionChange": true,
"title": true,
@@ -213,15 +213,15 @@
"jobName": "paragraph_1578045204379_-1427374232",
"id": "paragraph_1578045204379_-1427374232",
"dateCreated": "2020-01-03 17:53:24.379",
- "dateStarted": "2020-04-27 11:38:37.856",
- "dateFinished": "2020-04-27 11:38:38.118",
+ "dateStarted": "2020-05-05 23:01:41.334",
+ "dateFinished": "2020-05-05 23:01:41.675",
"status": "FINISHED"
},
{
"title": "Show tables",
"text": "%flink.bsql\n\nshow tables",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:40:15.544",
+ "dateUpdated": "2020-05-05 22:52:11.086",
"config": {
"colWidth": 12.0,
"fontSize": 9.0,
@@ -282,15 +282,15 @@
"jobName": "paragraph_1587958743728_1444404682",
"id": "paragraph_1587958743728_1444404682",
"dateCreated": "2020-04-27 11:39:03.728",
- "dateStarted": "2020-04-27 11:39:06.222",
- "dateFinished": "2020-04-27 11:39:06.500",
+ "dateStarted": "2020-05-05 22:52:11.091",
+ "dateFinished": "2020-05-05 22:52:11.283",
"status": "FINISHED"
},
{
"title": "Define UDTF ParseFunction to parse the raw data",
"text": "%flink\n\nimport org.apache.flink.api.java.typeutils.RowTypeInfo\nimport org.apache.flink.api.common.typeinfo.Types\nimport org.apache.flink.api.java.typeutils._\nimport org.apache.flink.api.scala.typeutils._\nimport org.apache.flink.api.scala._\n\nclass Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val [...]
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:38:03.810",
+ "dateUpdated": "2020-05-05 23:01:45.688",
"config": {
"runOnSelectionChange": true,
"title": true,
@@ -325,15 +325,15 @@
"jobName": "paragraph_1578888628353_1621411444",
"id": "paragraph_1578888628353_1621411444",
"dateCreated": "2020-01-13 12:10:28.359",
- "dateStarted": "2020-04-27 11:38:03.814",
- "dateFinished": "2020-04-27 11:38:06.223",
+ "dateStarted": "2020-05-05 23:01:45.693",
+ "dateFinished": "2020-05-05 23:01:48.276",
"status": "FINISHED"
},
{
"title": "Clean data",
"text": "%sh\n\nrm -rf /tmp/bank_cleaned\n#hadoop fs -rmr /tmp/bank_cleaned",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:38:44.643",
+ "dateUpdated": "2020-05-05 22:52:42.774",
"config": {
"runOnSelectionChange": true,
"title": true,
@@ -363,15 +363,15 @@
"jobName": "paragraph_1579061020460_-113987164",
"id": "paragraph_1579061020460_-113987164",
"dateCreated": "2020-01-15 12:03:40.468",
- "dateStarted": "2020-04-27 11:38:44.647",
- "dateFinished": "2020-04-27 11:38:44.672",
+ "dateStarted": "2020-05-05 22:52:42.780",
+ "dateFinished": "2020-05-05 22:52:42.836",
"status": "FINISHED"
},
{
"title": "Parse the data and write it into sink table",
"text": "%flink.bsql\n\ninsert into bank select T.* from bank_raw, LATERAL TABLE(parse(content)) as T(age, job, marital, education, `default`, balance, housing, loan, contact, `day`, `month`, duration, campaign, pdays, previous, poutcome, y) ",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:38:47.002",
+ "dateUpdated": "2020-05-05 22:53:04.445",
"config": {
"runOnSelectionChange": true,
"title": true,
@@ -406,15 +406,15 @@
"jobName": "paragraph_1578669828368_-1923137601",
"id": "paragraph_1578669828368_-1923137601",
"dateCreated": "2020-01-10 23:23:48.368",
- "dateStarted": "2020-04-27 11:38:47.006",
- "dateFinished": "2020-04-27 11:38:50.488",
+ "dateStarted": "2020-05-05 22:53:04.450",
+ "dateFinished": "2020-05-05 22:53:05.856",
"status": "FINISHED"
},
{
"title": "Preview output data",
"text": "%flink.bsql\n\nselect * from bank limit 10\n",
"user": "anonymous",
- "dateUpdated": "2020-04-27 11:40:23.766",
+ "dateUpdated": "2020-05-05 23:01:51.464",
"config": {
"colWidth": 12.0,
"fontSize": 9.0,
@@ -493,37 +493,199 @@
"jobName": "paragraph_1578068480238_-1678045273",
"id": "paragraph_1578068480238_-1678045273",
"dateCreated": "2020-01-04 00:21:20.267",
- "dateStarted": "2020-04-27 11:40:23.770",
- "dateFinished": "2020-04-27 11:40:24.710",
+ "dateStarted": "2020-05-05 23:01:51.469",
+ "dateFinished": "2020-05-05 23:01:55.009",
"status": "FINISHED"
},
{
- "text": "%flink.bsql\n",
+ "title": "Display table via z.show",
+ "text": "%flink\n\nval table \u003d btenv.sqlQuery(\"select * from bank limit 10\")\nz.show(table)",
"user": "anonymous",
- "dateUpdated": "2020-02-02 16:41:35.121",
+ "dateUpdated": "2020-05-05 23:33:44.788",
"config": {
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
- "results": {},
+ "results": {
+ "1": {
+ "graph": {
+ "mode": "table",
+ "height": 300.0,
+ "optionOpen": false,
+ "setting": {
+ "table": {
+ "tableGridState": {},
+ "tableColumnTypeState": {
+ "names": {
+ "age": "string",
+ "job": "string",
+ "marital": "string",
+ "education": "string",
+ "default": "string",
+ "balance": "string",
+ "housing": "string",
+ "loan": "string",
+ "contact": "string",
+ "day": "string",
+ "month": "string",
+ "duration": "string",
+ "campaign": "string",
+ "pdays": "string",
+ "previous": "string",
+ "poutcome": "string",
+ "y": "string"
+ },
+ "updated": false
+ },
+ "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...]
+ "tableOptionValue": {
+ "useFilter": false,
+ "showPagination": false,
+ "showAggregationFooter": false
+ },
+ "updated": false,
+ "initialized": false
+ }
+ },
+ "commonSetting": {}
+ }
+ }
+ },
"editorSetting": {
- "language": "sql",
+ "language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
- "editorMode": "ace/mode/sql"
+ "editorMode": "ace/mode/scala",
+ "title": true
},
"settings": {
"params": {},
"forms": {}
},
+ "results": {
+ "code": "SUCCESS",
+ "msg": [
+ {
+ "type": "TEXT",
+ "data": "\u001b[1m\u001b[34mtable\u001b[0m: \u001b[1m\u001b[32morg.apache.flink.table.api.Table\u001b[0m \u003d UnnamedTable$3\n"
+ },
+ {
+ "type": "TABLE",
+ "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...]
+ }
+ ]
+ },
"apps": [],
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1579061037737_-1577558456",
"id": "paragraph_1579061037737_-1577558456",
"dateCreated": "2020-01-15 12:03:57.737",
+ "dateStarted": "2020-05-05 23:03:09.350",
+ "dateFinished": "2020-05-05 23:03:11.061",
+ "status": "FINISHED"
+ },
+ {
+ "title": "Display table via z.show in PyFlink",
+ "text": "%flink.pyflink\n\ntable \u003d bt_env.sql_query(\"select * from bank limit 10\")\nz.show(table)",
+ "user": "anonymous",
+ "dateUpdated": "2020-05-05 23:37:48.878",
+ "config": {
+ "colWidth": 12.0,
+ "fontSize": 9.0,
+ "enabled": true,
+ "results": {
+ "0": {
+ "graph": {
+ "mode": "table",
+ "height": 300.0,
+ "optionOpen": false,
+ "setting": {
+ "table": {
+ "tableGridState": {},
+ "tableColumnTypeState": {
+ "names": {
+ "age": "string",
+ "job": "string",
+ "marital": "string",
+ "education": "string",
+ "default": "string",
+ "balance": "string",
+ "housing": "string",
+ "loan": "string",
+ "contact": "string",
+ "day": "string",
+ "month": "string",
+ "duration": "string",
+ "campaign": "string",
+ "pdays": "string",
+ "previous": "string",
+ "poutcome": "string",
+ "y": "string"
+ },
+ "updated": false
+ },
+ "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...]
+ "tableOptionValue": {
+ "useFilter": false,
+ "showPagination": false,
+ "showAggregationFooter": false
+ },
+ "updated": false,
+ "initialized": false
+ }
+ },
+ "commonSetting": {}
+ }
+ }
+ },
+ "editorSetting": {
+ "language": "python",
+ "editOnDblClick": false,
+ "completionKey": "TAB",
+ "completionSupport": true
+ },
+ "editorMode": "ace/mode/python",
+ "title": true
+ },
+ "settings": {
+ "params": {},
+ "forms": {}
+ },
+ "results": {
+ "code": "SUCCESS",
+ "msg": [
+ {
+ "type": "TABLE",
+ "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...]
+ }
+ ]
+ },
+ "apps": [],
+ "progressUpdateIntervalMs": 500,
+ "jobName": "paragraph_1588690392097_1159956807",
+ "id": "paragraph_1588690392097_1159956807",
+ "dateCreated": "2020-05-05 22:53:12.097",
+ "dateStarted": "2020-05-05 23:37:07.989",
+ "dateFinished": "2020-05-05 23:37:15.984",
"status": "FINISHED"
+ },
+ {
+ "text": "%flink.pyflink\n",
+ "user": "anonymous",
+ "dateUpdated": "2020-05-05 23:37:07.989",
+ "config": {},
+ "settings": {
+ "params": {},
+ "forms": {}
+ },
+ "apps": [],
+ "progressUpdateIntervalMs": 500,
+ "jobName": "paragraph_1588693027989_1331448600",
+ "id": "paragraph_1588693027989_1331448600",
+ "dateCreated": "2020-05-05 23:37:07.989",
+ "status": "READY"
}
],
"name": "6. Batch ETL",