You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/06 05:44:50 UTC
[flink] branch release-1.7 updated: [FLINK-9555][scala-shell]
Support table api in scala shell.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new efc73a8 [FLINK-9555][scala-shell] Support table api in scala shell.
efc73a8 is described below
commit efc73a872ac52e314bb1a05b9c5ed045cde6df1f
Author: chensq <ac...@alibaba-inc.com>
AuthorDate: Fri Nov 16 15:33:23 2018 +0800
[FLINK-9555][scala-shell] Support table api in scala shell.
This closes #7121
---
flink-scala-shell/pom.xml | 7 +++
.../org/apache/flink/api/scala/FlinkILoop.scala | 39 +++++++++++----
.../apache/flink/api/scala/ScalaShellITCase.scala | 55 ++++++++++++++++++++++
.../start-script/start-scala-shell.sh | 7 +++
4 files changed, 99 insertions(+), 9 deletions(-)
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 5a96804..dfb7416 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -78,6 +78,13 @@ under the License.
<version>${scala.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 4b6e886..c124d8e 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, ScalaShellRemoteStreamEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.util.AbstractID
import scala.tools.nsc.interpreter._
@@ -90,10 +92,17 @@ class FlinkILoop(
}
// local environment
- val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) = {
+ val (
+ scalaBenv: ExecutionEnvironment,
+ scalaSenv: StreamExecutionEnvironment,
+ scalaBTEnv: BatchTableEnvironment,
+ scalaSTEnv: StreamTableEnvironment
+ ) = {
val scalaBenv = new ExecutionEnvironment(remoteBenv)
val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
- (scalaBenv,scalaSenv)
+ val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
+ val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
+ (scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
}
/**
@@ -139,7 +148,10 @@ class FlinkILoop(
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
- "org.apache.flink.streaming.api.windowing.time._"
+ "org.apache.flink.streaming.api.windowing.time._",
+ "org.apache.flink.table.api._",
+ "org.apache.flink.table.api.scala._",
+ "org.apache.flink.types.Row"
)
override def createInterpreter(): Unit = {
@@ -152,6 +164,8 @@ class FlinkILoop(
// set execution environment
intp.bind("benv", this.scalaBenv)
intp.bind("senv", this.scalaSenv)
+ intp.bind("btenv", this.scalaBTEnv)
+ intp.bind("stenv", this.scalaSTEnv)
}
}
@@ -243,22 +257,29 @@ class FlinkILoop(
F L I N K - S C A L A - S H E L L
-NOTE: Use the prebound Execution Environments to implement batch or streaming programs.
+NOTE: Use the prebound Execution Environments and Table Environment to implement batch or streaming programs.
- Batch - Use the 'benv' variable
+ Batch - Use the 'benv' and 'btenv' variable
* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")
+ *
+ * val batchTable = btenv.fromDataSet(dataSet)
+ * btenv.registerTable("tableName", batchTable)
+ * val result = btenv.sqlQuery("SELECT * FROM tableName").collect
+ HINT: You can use print() on a DataSet to print the contents or collect()
+ a sql query result back to the shell.
- HINT: You can use print() on a DataSet to print the contents to the shell.
-
- Streaming - Use the 'senv' variable
+ Streaming - Use the 'senv' and 'stenv' variable
* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
+ *
+ * val streamTable = stenv.fromDataStream(dataStream, 'num)
+ * val resultTable = streamTable.select('num).where('num % 2 === 1 )
+ * resultTable.toAppendStream[Row].print()
* senv.execute("My streaming program")
-
HINT: You can only print a DataStream to the shell in local mode.
"""
// scalastyle:on
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 337e4fb..fc90d8d 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -168,6 +168,61 @@ class ScalaShellITCase extends TestLogger {
Assert.assertTrue(output.contains("WC(world,10)"))
}
+ @Test
+ def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+ val input =
+ """
+ |val data = Seq(
+ | (1, 1L, "Hi"),
+ | (2, 2L, "Hello"),
+ | (3, 2L, "Hello world"))
+ |val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 'c).select('a,'c).where(
+ |'a% 2 === 1 )
+ |val results = t.toDataSet[Row].collect()
+ |results.foreach(println)
+ |:q
+ """.stripMargin
+ val output = processInShell(input)
+ Assert.assertFalse(output.contains("failed"))
+ Assert.assertFalse(output.contains("error"))
+ Assert.assertFalse(output.contains("Exception"))
+ Assert.assertTrue(output.contains("1,Hi"))
+ Assert.assertTrue(output.contains("3,Hello world"))
+ }
+
+ @Test
+ def testGroupedAggregationStreamTableAPIQuery: Unit = {
+ val input =
+ """
+ | val data = List(
+ | ("Hello", 1),
+ | ("word", 1),
+ | ("Hello", 1),
+ | ("bark", 1),
+ | ("bark", 1),
+ | ("bark", 1),
+ | ("bark", 1),
+ | ("bark", 1),
+ | ("bark", 1),
+ | ("flink", 1)
+ | )
+ | val stream = senv.fromCollection(data)
+ | val table = stream.toTable(stenv, 'word, 'num)
+ | val resultTable = table.groupBy('word).select('num.sum as 'count).groupBy('count).select(
+ | 'count,'count.count as 'frequency)
+ | val results = resultTable.toRetractStream[Row]
+ | results.print
+ | senv.execute
+ """.stripMargin
+ val output = processInShell(input)
+ Assert.assertTrue(output.contains("6,1"))
+ Assert.assertTrue(output.contains("1,2"))
+ Assert.assertTrue(output.contains("2,1"))
+ Assert.assertFalse(output.contains("failed"))
+ Assert.assertFalse(output.contains("error"))
+ Assert.assertFalse(output.contains("Exception"))
+ }
+
/**
* Submit external library.
* Disabled due to FLINK-7111.
diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh
index 033d505..e357114 100644
--- a/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-scala-shell/start-script/start-scala-shell.sh
@@ -52,6 +52,13 @@ bin=`cd "$bin"; pwd`
FLINK_CLASSPATH=`constructFlinkClassPath`
+# Append flink-table jar into class path
+opt=`dirname "$0"`
+opt=`cd ../"$opt"/opt; pwd`
+FLINK_TABLE_LIB_PATH=$opt/`ls $opt|grep flink-table_*`
+FLINK_CLASSPATH=$FLINK_CLASSPATH:$FLINK_TABLE_LIB_PATH
+
+
# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
# in scala shell since 2.10, has to be done at startup
# checks arguments for additional classpath and adds it to the "standard classpath"