You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/06 05:44:50 UTC

[flink] branch release-1.7 updated: [FLINK-9555][scala-shell] Support table api in scala shell.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new efc73a8  [FLINK-9555][scala-shell] Support table api in scala shell.
efc73a8 is described below

commit efc73a872ac52e314bb1a05b9c5ed045cde6df1f
Author: chensq <ac...@alibaba-inc.com>
AuthorDate: Fri Nov 16 15:33:23 2018 +0800

    [FLINK-9555][scala-shell] Support table api in scala shell.
    
    This closes #7121
---
 flink-scala-shell/pom.xml                          |  7 +++
 .../org/apache/flink/api/scala/FlinkILoop.scala    | 39 +++++++++++----
 .../apache/flink/api/scala/ScalaShellITCase.scala  | 55 ++++++++++++++++++++++
 .../start-script/start-scala-shell.sh              |  7 +++
 4 files changed, 99 insertions(+), 9 deletions(-)

diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 5a96804..dfb7416 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -78,6 +78,13 @@ under the License.
 			<version>${scala.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 4b6e886..c124d8e 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
 import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, ScalaShellRemoteStreamEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
 import org.apache.flink.util.AbstractID
 
 import scala.tools.nsc.interpreter._
@@ -90,10 +92,17 @@ class FlinkILoop(
   }
 
   // local environment
-  val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) = {
+  val (
+    scalaBenv: ExecutionEnvironment,
+    scalaSenv: StreamExecutionEnvironment,
+    scalaBTEnv: BatchTableEnvironment,
+    scalaSTEnv: StreamTableEnvironment
+    ) = {
     val scalaBenv = new ExecutionEnvironment(remoteBenv)
     val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
-    (scalaBenv,scalaSenv)
+    val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
+    val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
+    (scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
   }
 
   /**
@@ -139,7 +148,10 @@ class FlinkILoop(
     "org.apache.flink.api.scala._",
     "org.apache.flink.api.scala.utils._",
     "org.apache.flink.streaming.api.scala._",
-    "org.apache.flink.streaming.api.windowing.time._"
+    "org.apache.flink.streaming.api.windowing.time._",
+    "org.apache.flink.table.api._",
+    "org.apache.flink.table.api.scala._",
+    "org.apache.flink.types.Row"
   )
 
   override def createInterpreter(): Unit = {
@@ -152,6 +164,8 @@ class FlinkILoop(
       // set execution environment
       intp.bind("benv", this.scalaBenv)
       intp.bind("senv", this.scalaSenv)
+      intp.bind("btenv", this.scalaBTEnv)
+      intp.bind("stenv", this.scalaSTEnv)
     }
   }
 
@@ -243,22 +257,29 @@ class FlinkILoop(
 
               F L I N K - S C A L A - S H E L L
 
-NOTE: Use the prebound Execution Environments to implement batch or streaming programs.
+NOTE: Use the prebound Execution Environments and Table Environment to implement batch or streaming programs.
 
-  Batch - Use the 'benv' variable
+  Batch - Use the 'benv' and 'btenv' variable
 
     * val dataSet = benv.readTextFile("/path/to/data")
     * dataSet.writeAsText("/path/to/output")
     * benv.execute("My batch program")
+    *
+    * val batchTable = btenv.fromDataSet(dataSet)
+    * btenv.registerTable("tableName", batchTable)
+    * val result = btenv.sqlQuery("SELECT * FROM tableName").collect
+    HINT: You can use print() on a DataSet to print the contents or collect()
+    a sql query result back to the shell.
 
-    HINT: You can use print() on a DataSet to print the contents to the shell.
-
-  Streaming - Use the 'senv' variable
+  Streaming - Use the 'senv' and 'stenv' variable
 
     * val dataStream = senv.fromElements(1, 2, 3, 4)
     * dataStream.countWindowAll(2).sum(0).print()
+    *
+    * val streamTable = stenv.fromDataStream(dataStream, 'num)
+    * val resultTable = streamTable.select('num).where('num % 2 === 1 )
+    * resultTable.toAppendStream[Row].print()
     * senv.execute("My streaming program")
-
     HINT: You can only print a DataStream to the shell in local mode.
       """
     // scalastyle:on
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 337e4fb..fc90d8d 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -168,6 +168,61 @@ class ScalaShellITCase extends TestLogger {
     Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+    val input =
+      """
+        |val data = Seq(
+        |    (1, 1L, "Hi"),
+        |    (2, 2L, "Hello"),
+        |    (3, 2L, "Hello world"))
+        |val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 'c).select('a,'c).where(
+        |'a% 2 === 1 )
+        |val results = t.toDataSet[Row].collect()
+        |results.foreach(println)
+        |:q
+      """.stripMargin
+    val output = processInShell(input)
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+    Assert.assertTrue(output.contains("1,Hi"))
+    Assert.assertTrue(output.contains("3,Hello world"))
+  }
+
+  @Test
+  def testGroupedAggregationStreamTableAPIQuery: Unit = {
+    val input =
+      """
+        |  val data = List(
+        |    ("Hello", 1),
+        |    ("word", 1),
+        |    ("Hello", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("bark", 1),
+        |    ("flink", 1)
+        |  )
+        | val stream = senv.fromCollection(data)
+        | val table = stream.toTable(stenv, 'word, 'num)
+        | val resultTable = table.groupBy('word).select('num.sum as 'count).groupBy('count).select(
+        | 'count,'count.count as 'frequency)
+        | val results = resultTable.toRetractStream[Row]
+        | results.print
+        | senv.execute
+      """.stripMargin
+    val output = processInShell(input)
+    Assert.assertTrue(output.contains("6,1"))
+    Assert.assertTrue(output.contains("1,2"))
+    Assert.assertTrue(output.contains("2,1"))
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+
   /**
    * Submit external library.
    * Disabled due to FLINK-7111.
diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh
index 033d505..e357114 100644
--- a/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-scala-shell/start-script/start-scala-shell.sh
@@ -52,6 +52,13 @@ bin=`cd "$bin"; pwd`
 
 FLINK_CLASSPATH=`constructFlinkClassPath`
 
+# Append flink-table jar into class path
+opt=`dirname "$0"`
+opt=`cd ../"$opt"/opt; pwd`
+FLINK_TABLE_LIB_PATH=$opt/`ls $opt|grep flink-table_*`
+FLINK_CLASSPATH=$FLINK_CLASSPATH:$FLINK_TABLE_LIB_PATH
+
+
 # https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
 # in scala shell since 2.10, has to be done at startup
 # checks arguments for additional classpath and adds it to the "standard classpath"