You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/09 06:56:25 UTC

[GitHub] WeiZhong94 commented on a change in pull request #7437: [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell

WeiZhong94 commented on a change in pull request #7437: [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell
URL: https://github.com/apache/flink/pull/7437#discussion_r246275229
 
 

 ##########
 File path: docs/ops/scala_shell.md
 ##########
 @@ -85,6 +86,113 @@ Note, that in the Streaming case, the print operation does not trigger execution
 
 The Flink Shell comes with command history and auto-completion.
 
+### Table API
+
+The example below is a wordcount program using Table API:
+<div class="codetabs" markdown="1">
+<div data-lang="stream" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = stenv.fromDataStream(
+  senv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"), 
+  'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+    groupBy('word).select('word, 'word.count as 'count).
+    toRetractStream[(String, Long)].print
+Scala-Flink> senv.execute("Table Wordcount")
+{% endhighlight %}
+</div>
+<div data-lang="batch" markdown="1">
+{% highlight scala %}
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val textSource = btenv.fromDataSet(
+  benv.fromElements(
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,"), 
+  'text)
+Scala-Flink> class $Split extends TableFunction[String] {
+    def eval(s: String): Unit = {
+      s.toLowerCase.split("\\W+").foreach(collect)
+    }
+  }
+Scala-Flink> val split = new $Split
+Scala-Flink> textSource.join(split('text) as 'word).
+    groupBy('word).select('word, 'word.count as 'count).
+    toDataSet[(String, Long)].print
+{% endhighlight %}
+</div>
+</div>
+
+Note, that the $ prefix of the TableFunction classname is a walkaround of a scala issue about incorrectly generated inner class name. 
 
 Review comment:
   That sounds better. I have rewritten the sentence.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services