You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/01/10 08:31:12 UTC

[flink] branch master updated: [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6826c67  [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.
6826c67 is described below

commit 6826c67028171b401df8bd32dad327051133d31a
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Wed Jan 9 11:13:38 2019 +0800

    [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.
    
    This closes #7437
---
 docs/ops/scala_shell.md | 114 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 111 insertions(+), 3 deletions(-)

diff --git a/docs/ops/scala_shell.md b/docs/ops/scala_shell.md
index e7df864..50987af 100644
--- a/docs/ops/scala_shell.md
+++ b/docs/ops/scala_shell.md
@@ -36,9 +36,10 @@ cluster, please see the Setup section below.
 
 ## Usage
 
-The shell supports Batch and Streaming.
-Two different ExecutionEnvironments are automatically prebound after startup.
-Use "benv" and "senv" to access the Batch and Streaming environment respectively.
+The shell supports DataSet, DataStream, Table API and SQL. 
+Four different Environments are automatically prebound after startup. 
+Use "benv" and "senv" to access the Batch and Streaming ExecutionEnvironment respectively. 
+Use "btenv" and "stenv" to access BatchTableEnvironment and StreamTableEnvironment respectively.
 
 ### DataSet API
 
@@ -85,6 +86,113 @@ Note, that in the Streaming case, the print operation does not trigger execution
 
 The Flink Shell comes with command history and auto-completion.
 
+### Table API
+
+The example below is a wordcount program using Table API:
+<div class="codetabs" markdown="1">
+<div data-lang="stream" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = stenv.fromDataStream(
+  senv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"),
+  'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+    groupBy('word).select('word, 'word.count as 'count).
+    toRetractStream[(String, Long)].print
+Scala-Flink> senv.execute("Table Wordcount")
+{% endhighlight %}
+</div>
+<div data-lang="batch" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = btenv.fromDataSet(
+  benv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"), 
+  'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+    groupBy('word).select('word, 'word.count as 'count).
+    toDataSet[(String, Long)].print
+{% endhighlight %}
+</div>
+</div>
+
+Note, that using $ as a prefix for the class name of TableFunction is a workaround of the issue that scala incorrectly generated inner class name.
+
+### SQL
+
+The following example is a wordcount program written in SQL:
+<div class="codetabs" markdown="1">
+<div data-lang="stream" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = stenv.fromDataStream(
+  senv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"), 
+  'text)
+Scala-Flink> stenv.registerTable("text_source", textSource)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> stenv.registerFunction("split", new $Split)
+Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` 
+    FROM text_source 
+    JOIN LATERAL table(split(text)) AS T(word) 
+    ON TRUE 
+    GROUP BY T.word""")
+Scala-Flink> result.toRetractStream[(String, Long)].print
+Scala-Flink> senv.execute("SQL Wordcount")
+{% endhighlight %}
+</div>
+<div data-lang="batch" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = btenv.fromDataSet(
+  benv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"), 
+  'text)
+Scala-Flink> btenv.registerTable("text_source", textSource)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> btenv.registerFunction("split", new $Split)
+Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` 
+    FROM text_source 
+    JOIN LATERAL table(split(text)) AS T(word) 
+    ON TRUE 
+    GROUP BY T.word""")
+Scala-Flink> result.toDataSet[(String, Long)].print
+{% endhighlight %}
+</div>
+</div>
 
 ## Adding external dependencies