You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/10/09 16:38:47 UTC

flink git commit: [FLINK-2613] [scala shell] Print usage information for Scala Shell

Repository: flink
Updated Branches:
  refs/heads/master 48f614cb7 -> 7750a1f29


[FLINK-2613] [scala shell] Print usage information for Scala Shell

  - Change startup code of scala shell
  - User has to specify local or remote mode explicitly now.

This closes #1106.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7750a1f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7750a1f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7750a1f2

Branch: refs/heads/master
Commit: 7750a1f29e231f16cbbea4d91dbe530b1f92f192
Parents: 48f614c
Author: nikste <ni...@gmail.com>
Authored: Tue Sep 29 09:50:17 2015 +0200
Committer: Chiwan Park <ch...@apache.org>
Committed: Fri Oct 9 16:38:11 2015 +0200

----------------------------------------------------------------------
 docs/apis/scala_shell.md                        |   9 +-
 .../org/apache/flink/api/scala/FlinkILoop.scala |  12 +-
 .../org/apache/flink/api/scala/FlinkShell.scala | 148 ++++++++++++-------
 .../flink/api/scala/ScalaShellITSuite.scala     |  54 ++++++-
 .../scala/ScalaShellLocalStartupITCase.scala    |  60 ++++++++
 .../start-script/start-scala-shell.sh           |   4 +-
 6 files changed, 218 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
index 9bd04d9..a0c10c1 100644
--- a/docs/apis/scala_shell.md
+++ b/docs/apis/scala_shell.md
@@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to
 To use the shell with an integrated Flink cluster just execute:
 
 ~~~bash
-bin/start-scala-shell.sh 
+bin/start-scala-shell.sh local
 ~~~
 
 in the root directory of your binary Flink directory.
 
-To use it with a running cluster you can supply the host and port of the JobManager with:
+To use it with a running cluster start the scala shell with the keyword `remote`
+and supply the host and port of the JobManager with:
 
 ~~~bash
-bin/start-scala-shell.sh --host <hostname> --port <portnumber>
+bin/start-scala-shell.sh remote <hostname> <portnumber>
 ~~~
 
 ## Usage
@@ -75,6 +76,6 @@ It is possible to add external classpaths to the Scala-shell. These will be sent
 Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
 
 ~~~bash
-bin/start-scala-shell --addclasspath <path/to/jar.jar>
+bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>
 ~~~
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index bcf9bc2..f13288b 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -34,16 +34,16 @@ class FlinkILoop(
     out0: JPrintWriter)
   extends ILoopCompat(in0, out0) {
 
-  def this(host:String, 
-           port:Int, 
-           externalJars : Option[Array[String]], 
+  def this(host: String,
+           port: Int,
+           externalJars: Option[Array[String]],
            in0: BufferedReader, 
            out: JPrintWriter){
-    this(host:String, port:Int, externalJars, Some(in0), out)
+    this(host: String, port: Int, externalJars, Some(in0), out)
   }
 
-  def this(host:String, port:Int, externalJars : Option[Array[String]]){
-    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+  def this(host: String, port: Int, externalJars: Option[Array[String]]){
+    this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true))
   }
   
   def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){

http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 224983b..54bbf80 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -18,48 +18,73 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+    val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
     // scopt, command line arguments
     case class Config(
         port: Int = -1,
         host: String = "none",
-        externalJars: Option[Array[String]] = None)
+        externalJars: Option[Array[String]] = None,
+        flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED)
+
     val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
       head ("Flink Scala Shell")
 
-      opt[Int] ('p', "port") action {
-        (x, c) =>
-          c.copy (port = x)
-      } text("port specifies port of running JobManager")
-
-      opt[(String)] ('h',"host") action {
-        case (x, c) =>
-          c.copy (host = x)
-      }  text("host specifies host name of running JobManager")
-
-      opt[(String)] ('a',"addclasspath") action {
-        case (x,c) =>
-          val xArray = x.split(":")
-          c.copy(externalJars = Option(xArray))
-      } text("specifies additional jars to be used in Flink")
-      
-      help("help") text("prints this usage text")
+      cmd("local") action {
+        (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL)
+      } text("starts Flink scala shell with a local Flink cluster\n") children(
+        opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+          case (x, c) =>
+            val xArray = x.split(":")
+            c.copy(externalJars = Option(xArray))
+          } text("specifies additional jars to be used in Flink\n")
+        )
+
+      cmd("remote") action { (_, c) =>
+        c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+      } text("starts Flink scala shell connecting to a remote cluster\n") children(
+        arg[String]("<host>") action { (h, c) =>
+          c.copy(host = h) }
+          text("remote host name as string"),
+        arg[Int]("<port>") action { (p, c) =>
+          c.copy(port = p) }
+          text("remote port as integer\n"),
+        opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+          case (x, c) =>
+            val xArray = x.split(":")
+            c.copy(externalJars = Option(xArray))
+          } text("specifies additional jars to be used in Flink")
+      )
+      help("help") abbr("h") text("prints this usage text\n")
     }
 
-
     // parse arguments
-    parser.parse (args, Config () ) match {
+    parser.parse (args, Config()) match {
       case Some(config) =>
-        startShell(config.host,config.port,config.externalJars)
+        startShell(config.host,
+          config.port,
+          config.flinkShellExecutionMode,
+          config.externalJars)
 
       case _ => println("Could not parse program arguments")
     }
@@ -67,39 +92,62 @@ object FlinkShell {
 
 
   def startShell(
-      userHost : String, 
-      userPort : Int, 
-      externalJars : Option[Array[String]] = None): Unit ={
+      userHost: String,
+      userPort: Int,
+      executionMode: ExecutionMode.Value,
+      externalJars: Option[Array[String]] = None): Unit ={
     
     println("Starting Flink Shell:")
 
-    var cluster: LocalFlinkMiniCluster = null
-
     // either port or userhost not specified by user, create new minicluster
-    val (host,port) = if (userHost == "none" || userPort == -1 ) {
-      println("Creating new local server")
-      cluster = new LocalFlinkMiniCluster(new Configuration, false)
-      cluster.start()
-      ("localhost",cluster.getLeaderRPCPort)
-    } else {
-      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
-      (userHost, userPort)
-    }
-    
-    // custom shell
-    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
-    repl.settings = new Settings()
-
-    repl.settings.usejavacp.value = true
-
-    // start scala interpreter shell
-    repl.process(repl.settings)
-
-    //repl.closeInterpreter()
-
-    if (cluster != null) {
-      cluster.stop()
+    val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
+      executionMode match {
+        case ExecutionMode.LOCAL =>
+          val miniCluster = new LocalFlinkMiniCluster(new Configuration, false)
+          miniCluster.start()
+          val port = miniCluster.getLeaderRPCPort
+          println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+          ("localhost", port, Some(miniCluster))
+
+        case ExecutionMode.REMOTE =>
+          if (userHost == "none" || userPort == -1) {
+            println("Error: <host> or <port> not specified!")
+            return
+          } else {
+            println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n")
+            (userHost, userPort, None)
+          }
+
+        case ExecutionMode.UNDEFINED =>
+          println("Error: please specify execution mode:")
+          println("[local | remote <host> <port>]")
+          return
+      }
+
+    try {
+      // custom shell
+      val repl: FlinkILoop =
+        bufferedReader match {
+
+          case Some(br) =>
+            val out = new StringWriter()
+            new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out))
+
+          case None =>
+            new FlinkILoop(host, port, externalJars)
+        }
+
+      val settings = new Settings()
+
+      settings.usejavacp.value = true
+
+      // start scala interpreter shell
+      repl.process(settings)
+    } finally {
+      cluster match {
+        case Some(c) => c.stop()
+        case None =>
+      }
     }
 
     println(" good bye ..")

http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 0621351..c8b1990 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -33,6 +33,9 @@ import scala.tools.nsc.Settings
 @RunWith(classOf[JUnitRunner])
 class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
   test("Prevent re-creation of environment") {
 
     val input: String =
@@ -51,7 +54,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     val input: String =
       """
         val initial = env.fromElements(0)
-
         val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
           val result = iterationInput.map { i =>
             val x = Math.random()
@@ -78,7 +80,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
         "Whether 'tis nobler in the mind to suffer",
         "The slings and arrows of outrageous fortune",
         "Or to take arms against a sea of troubles,")
-
         val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
         val result = counts.print()
       """.stripMargin
@@ -117,14 +118,11 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     val input =
       """
       case class WC(word: String, count: Int)
-
       val wordCounts = env.fromElements(
         new WC("hello", 1),
         new WC("world", 2),
         new WC("world", 8))
-
       val reduced = wordCounts.groupBy(0).sum(1)
-
       reduced.print()
       """.stripMargin
 
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     out.toString + stdout
   }
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
+  /**
+   * tests flink shell startup with remote cluster (starts cluster internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+    val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+      "els.print\nError\n:q\n"
+
+    val in: BufferedReader = new BufferedReader(
+      new StringReader(
+        input + "\n"))
+    val out: StringWriter = new StringWriter
+
+    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+    val oldOut: PrintStream = System.out
+    System.setOut(new PrintStream(baos))
+
+    val (c, args) = cluster match{
+      case Some(cl) =>
+        val arg = Array("remote",
+          cl.hostname,
+          Integer.toString(cl.getLeaderRPCPort))
+        (cl, arg)
+      case None =>
+        fail("Cluster creation failed!")
+    }
+
+    //start scala shell with initialized
+    // buffered reader for testing
+    FlinkShell.bufferedReader = Some(in)
+    FlinkShell.main(args)
+    baos.flush()
+
+    val output: String = baos.toString
+    System.setOut(oldOut)
+
+    output should include("Job execution switched to status FINISHED.")
+    output should include("a\nb")
+
+    output should not include "Error"
+    output should not include "ERROR"
+    output should not include "Exception"
+    output should not include "failed"
+  }
 
   override def beforeAll(): Unit = {
     val cl = TestBaseUtils.startCluster(

http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
new file mode 100644
index 0000000..60da09e
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+    /**
+     * tests flink shell with local setup through startup script in bin folder
+     */
+    test("start flink scala shell with local cluster") {
+
+      val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n"
+      val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
+      val out: StringWriter = new StringWriter
+      val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+      val oldOut: PrintStream = System.out
+      System.setOut(new PrintStream(baos))
+      val args: Array[String] = Array("local")
+
+      //start flink scala shell
+      FlinkShell.bufferedReader = Some(in);
+      FlinkShell.main(args)
+
+      baos.flush()
+      val output: String = baos.toString
+      System.setOut(oldOut)
+
+      output should include("Job execution switched to status FINISHED.")
+      output should include("a\nb")
+
+      output should not include "Error"
+      output should not include "ERROR"
+      output should not include "Exception"
+      output should not include "failed"
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7750a1f2/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
index 529b23c..0af2a9d 100644
--- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -77,10 +77,10 @@ done
 
 if ${EXTERNAL_LIB_FOUND}
 then
-    java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell --addclasspath "$EXT_CLASSPATH" $@
+    java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" 
 else
     java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
 fi
 
 #restore echo
-onExit
\ No newline at end of file
+onExit