You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/10/09 16:38:47 UTC
flink git commit: [FLINK-2613] [scala shell] Print usage information
for Scala Shell
Repository: flink
Updated Branches:
refs/heads/master 48f614cb7 -> 7750a1f29
[FLINK-2613] [scala shell] Print usage information for Scala Shell
- Change startup code of scala shell
- User has to specify local or remote mode explicitly now.
This closes #1106.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7750a1f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7750a1f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7750a1f2
Branch: refs/heads/master
Commit: 7750a1f29e231f16cbbea4d91dbe530b1f92f192
Parents: 48f614c
Author: nikste <ni...@gmail.com>
Authored: Tue Sep 29 09:50:17 2015 +0200
Committer: Chiwan Park <ch...@apache.org>
Committed: Fri Oct 9 16:38:11 2015 +0200
----------------------------------------------------------------------
docs/apis/scala_shell.md | 9 +-
.../org/apache/flink/api/scala/FlinkILoop.scala | 12 +-
.../org/apache/flink/api/scala/FlinkShell.scala | 148 ++++++++++++-------
.../flink/api/scala/ScalaShellITSuite.scala | 54 ++++++-
.../scala/ScalaShellLocalStartupITCase.scala | 60 ++++++++
.../start-script/start-scala-shell.sh | 4 +-
6 files changed, 218 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
index 9bd04d9..a0c10c1 100644
--- a/docs/apis/scala_shell.md
+++ b/docs/apis/scala_shell.md
@@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to
To use the shell with an integrated Flink cluster just execute:
~~~bash
-bin/start-scala-shell.sh
+bin/start-scala-shell.sh local
~~~
in the root directory of your binary Flink directory.
-To use it with a running cluster you can supply the host and port of the JobManager with:
+To use it with a running cluster start the scala shell with the keyword `remote`
+and supply the host and port of the JobManager with:
~~~bash
-bin/start-scala-shell.sh --host <hostname> --port <portnumber>
+bin/start-scala-shell.sh remote <hostname> <portnumber>
~~~
## Usage
@@ -75,6 +76,6 @@ It is possible to add external classpaths to the Scala-shell. These will be sent
Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
~~~bash
-bin/start-scala-shell --addclasspath <path/to/jar.jar>
+bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>
~~~
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index bcf9bc2..f13288b 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -34,16 +34,16 @@ class FlinkILoop(
out0: JPrintWriter)
extends ILoopCompat(in0, out0) {
- def this(host:String,
- port:Int,
- externalJars : Option[Array[String]],
+ def this(host: String,
+ port: Int,
+ externalJars: Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter){
- this(host:String, port:Int, externalJars, Some(in0), out)
+ this(host: String, port: Int, externalJars, Some(in0), out)
}
- def this(host:String, port:Int, externalJars : Option[Array[String]]){
- this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+ def this(host: String, port: Int, externalJars: Option[Array[String]]){
+ this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true))
}
def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 224983b..54bbf80 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -18,48 +18,73 @@
package org.apache.flink.api.scala
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter._
+
object FlinkShell {
+ object ExecutionMode extends Enumeration {
+ val UNDEFINED, LOCAL, REMOTE = Value
+ }
+
+ var bufferedReader: Option[BufferedReader] = None
+
def main(args: Array[String]) {
// scopt, command line arguments
case class Config(
port: Int = -1,
host: String = "none",
- externalJars: Option[Array[String]] = None)
+ externalJars: Option[Array[String]] = None,
+ flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED)
+
val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
head ("Flink Scala Shell")
- opt[Int] ('p', "port") action {
- (x, c) =>
- c.copy (port = x)
- } text("port specifies port of running JobManager")
-
- opt[(String)] ('h',"host") action {
- case (x, c) =>
- c.copy (host = x)
- } text("host specifies host name of running JobManager")
-
- opt[(String)] ('a',"addclasspath") action {
- case (x,c) =>
- val xArray = x.split(":")
- c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
-
- help("help") text("prints this usage text")
+ cmd("local") action {
+ (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL)
+ } text("starts Flink scala shell with a local Flink cluster\n") children(
+ opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink\n")
+ )
+
+ cmd("remote") action { (_, c) =>
+ c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+ } text("starts Flink scala shell connecting to a remote cluster\n") children(
+ arg[String]("<host>") action { (h, c) =>
+ c.copy(host = h) }
+ text("remote host name as string"),
+ arg[Int]("<port>") action { (p, c) =>
+ c.copy(port = p) }
+ text("remote port as integer\n"),
+ opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+ case (x, c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink")
+ )
+ help("help") abbr("h") text("prints this usage text\n")
}
-
// parse arguments
- parser.parse (args, Config () ) match {
+ parser.parse (args, Config()) match {
case Some(config) =>
- startShell(config.host,config.port,config.externalJars)
+ startShell(config.host,
+ config.port,
+ config.flinkShellExecutionMode,
+ config.externalJars)
case _ => println("Could not parse program arguments")
}
@@ -67,39 +92,62 @@ object FlinkShell {
def startShell(
- userHost : String,
- userPort : Int,
- externalJars : Option[Array[String]] = None): Unit ={
+ userHost: String,
+ userPort: Int,
+ executionMode: ExecutionMode.Value,
+ externalJars: Option[Array[String]] = None): Unit ={
println("Starting Flink Shell:")
- var cluster: LocalFlinkMiniCluster = null
-
// either port or userhost not specified by user, create new minicluster
- val (host,port) = if (userHost == "none" || userPort == -1 ) {
- println("Creating new local server")
- cluster = new LocalFlinkMiniCluster(new Configuration, false)
- cluster.start()
- ("localhost",cluster.getLeaderRPCPort)
- } else {
- println(s"Connecting to remote server (host: $userHost, port: $userPort).")
- (userHost, userPort)
- }
-
- // custom shell
- val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
- repl.settings = new Settings()
-
- repl.settings.usejavacp.value = true
-
- // start scala interpreter shell
- repl.process(repl.settings)
-
- //repl.closeInterpreter()
-
- if (cluster != null) {
- cluster.stop()
+ val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
+ executionMode match {
+ case ExecutionMode.LOCAL =>
+ val miniCluster = new LocalFlinkMiniCluster(new Configuration, false)
+ miniCluster.start()
+ val port = miniCluster.getLeaderRPCPort
+ println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+ ("localhost", port, Some(miniCluster))
+
+ case ExecutionMode.REMOTE =>
+ if (userHost == "none" || userPort == -1) {
+ println("Error: <host> or <port> not specified!")
+ return
+ } else {
+ println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n")
+ (userHost, userPort, None)
+ }
+
+ case ExecutionMode.UNDEFINED =>
+ println("Error: please specify execution mode:")
+ println("[local | remote <host> <port>]")
+ return
+ }
+
+ try {
+ // custom shell
+ val repl: FlinkILoop =
+ bufferedReader match {
+
+ case Some(br) =>
+ val out = new StringWriter()
+ new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out))
+
+ case None =>
+ new FlinkILoop(host, port, externalJars)
+ }
+
+ val settings = new Settings()
+
+ settings.usejavacp.value = true
+
+ // start scala interpreter shell
+ repl.process(settings)
+ } finally {
+ cluster match {
+ case Some(c) => c.stop()
+ case None =>
+ }
}
println(" good bye ..")
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 0621351..c8b1990 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -33,6 +33,9 @@ import scala.tools.nsc.Settings
@RunWith(classOf[JUnitRunner])
class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
+ var cluster: Option[ForkableFlinkMiniCluster] = None
+ val parallelism = 4
+
test("Prevent re-creation of environment") {
val input: String =
@@ -51,7 +54,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val input: String =
"""
val initial = env.fromElements(0)
-
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
@@ -78,7 +80,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
-
val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
val result = counts.print()
""".stripMargin
@@ -117,14 +118,11 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val input =
"""
case class WC(word: String, count: Int)
-
val wordCounts = env.fromElements(
new WC("hello", 1),
new WC("world", 2),
new WC("world", 8))
-
val reduced = wordCounts.groupBy(0).sum(1)
-
reduced.print()
""".stripMargin
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
out.toString + stdout
}
- var cluster: Option[ForkableFlinkMiniCluster] = None
- val parallelism = 4
+ /**
+ * tests flink shell startup with remote cluster (starts cluster internally)
+ */
+ test("start flink scala shell with remote cluster") {
+
+ val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+ "els.print\nError\n:q\n"
+
+ val in: BufferedReader = new BufferedReader(
+ new StringReader(
+ input + "\n"))
+ val out: StringWriter = new StringWriter
+
+ val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+ val oldOut: PrintStream = System.out
+ System.setOut(new PrintStream(baos))
+
+ val (c, args) = cluster match{
+ case Some(cl) =>
+ val arg = Array("remote",
+ cl.hostname,
+ Integer.toString(cl.getLeaderRPCPort))
+ (cl, arg)
+ case None =>
+ fail("Cluster creation failed!")
+ }
+
+ //start scala shell with initialized
+ // buffered reader for testing
+ FlinkShell.bufferedReader = Some(in)
+ FlinkShell.main(args)
+ baos.flush()
+
+ val output: String = baos.toString
+ System.setOut(oldOut)
+
+ output should include("Job execution switched to status FINISHED.")
+ output should include("a\nb")
+
+ output should not include "Error"
+ output should not include "ERROR"
+ output should not include "Exception"
+ output should not include "failed"
+ }
override def beforeAll(): Unit = {
val cl = TestBaseUtils.startCluster(
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
new file mode 100644
index 0000000..60da09e
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+ /**
+ * tests flink shell with local setup through startup script in bin folder
+ */
+ test("start flink scala shell with local cluster") {
+
+ val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n"
+ val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
+ val out: StringWriter = new StringWriter
+ val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+ val oldOut: PrintStream = System.out
+ System.setOut(new PrintStream(baos))
+ val args: Array[String] = Array("local")
+
+ //start flink scala shell
+ FlinkShell.bufferedReader = Some(in);
+ FlinkShell.main(args)
+
+ baos.flush()
+ val output: String = baos.toString
+ System.setOut(oldOut)
+
+ output should include("Job execution switched to status FINISHED.")
+ output should include("a\nb")
+
+ output should not include "Error"
+ output should not include "ERROR"
+ output should not include "Exception"
+ output should not include "failed"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
index 529b23c..0af2a9d 100644
--- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -77,10 +77,10 @@ done
if ${EXTERNAL_LIB_FOUND}
then
- java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell --addclasspath "$EXT_CLASSPATH" $@
+ java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH"
else
java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
fi
#restore echo
-onExit
\ No newline at end of file
+onExit