You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/01/10 08:31:12 UTC
[flink] branch master updated: [FLINK-11278] [docs] Add
documentation for TableAPI&SQL in scala-shell.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6826c67 [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.
6826c67 is described below
commit 6826c67028171b401df8bd32dad327051133d31a
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Wed Jan 9 11:13:38 2019 +0800
[FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.
This closes #7437
---
docs/ops/scala_shell.md | 114 ++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 111 insertions(+), 3 deletions(-)
diff --git a/docs/ops/scala_shell.md b/docs/ops/scala_shell.md
index e7df864..50987af 100644
--- a/docs/ops/scala_shell.md
+++ b/docs/ops/scala_shell.md
@@ -36,9 +36,10 @@ cluster, please see the Setup section below.
## Usage
-The shell supports Batch and Streaming.
-Two different ExecutionEnvironments are automatically prebound after startup.
-Use "benv" and "senv" to access the Batch and Streaming environment respectively.
+The shell supports DataSet, DataStream, Table API and SQL.
+Four different Environments are automatically prebound after startup.
+Use "benv" and "senv" to access the Batch and Streaming ExecutionEnvironment respectively.
+Use "btenv" and "stenv" to access BatchTableEnvironment and StreamTableEnvironment respectively.
### DataSet API
@@ -85,6 +86,113 @@ Note, that in the Streaming case, the print operation does not trigger execution
The Flink Shell comes with command history and auto-completion.
+### Table API
+
+The example below is a wordcount program using Table API:
+<div class="codetabs" markdown="1">
+<div data-lang="stream" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = stenv.fromDataStream(
+ senv.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"),
+ 'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+ def eval(s: String): Unit = {
+ s.toLowerCase.split("\\W+").foreach(collect)
+ }
+ }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+ groupBy('word).select('word, 'word.count as 'count).
+ toRetractStream[(String, Long)].print
+Scala-Flink> senv.execute("Table Wordcount")
+{% endhighlight %}
+</div>
+<div data-lang="batch" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = btenv.fromDataSet(
+ benv.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"),
+ 'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+ def eval(s: String): Unit = {
+ s.toLowerCase.split("\\W+").foreach(collect)
+ }
+ }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+ groupBy('word).select('word, 'word.count as 'count).
+ toDataSet[(String, Long)].print
+{% endhighlight %}
+</div>
+</div>
+
+Note, that using $ as a prefix for the class name of TableFunction is a workaround of the issue that scala incorrectly generated inner class name.
+
+### SQL
+
+The following example is a wordcount program written in SQL:
+<div class="codetabs" markdown="1">
+<div data-lang="stream" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = stenv.fromDataStream(
+ senv.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"),
+ 'text)
+Scala-Flink> stenv.registerTable("text_source", textSource)
+Scala-Flink> class $Split extends TableFunction[String] {
+ def eval(s: String): Unit = {
+ s.toLowerCase.split("\\W+").foreach(collect)
+ }
+ }
+Scala-Flink> stenv.registerFunction("split", new $Split)
+Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
+ FROM text_source
+ JOIN LATERAL table(split(text)) AS T(word)
+ ON TRUE
+ GROUP BY T.word""")
+Scala-Flink> result.toRetractStream[(String, Long)].print
+Scala-Flink> senv.execute("SQL Wordcount")
+{% endhighlight %}
+</div>
+<div data-lang="batch" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = btenv.fromDataSet(
+ benv.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"),
+ 'text)
+Scala-Flink> btenv.registerTable("text_source", textSource)
+Scala-Flink> class $Split extends TableFunction[String] {
+ def eval(s: String): Unit = {
+ s.toLowerCase.split("\\W+").foreach(collect)
+ }
+ }
+Scala-Flink> btenv.registerFunction("split", new $Split)
+Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
+ FROM text_source
+ JOIN LATERAL table(split(text)) AS T(word)
+ ON TRUE
+ GROUP BY T.word""")
+Scala-Flink> result.toDataSet[(String, Long)].print
+{% endhighlight %}
+</div>
+</div>
## Adding external dependencies