You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/04 16:59:58 UTC

flink git commit: [FLINK-2161] [scala shell] Modify start script to take additional argument (-a or --addclasspath ) for external libraries

Repository: flink
Updated Branches:
  refs/heads/master 9f96b32bf -> 3d528442f


[FLINK-2161] [scala shell] Modify start script to take additional argument (-a <path/to/class> or --addclasspath <path/to/class>) for external libraries

This closes #805


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d528442
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d528442
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d528442

Branch: refs/heads/master
Commit: 3d528442f6993c64600224483ee3048c40763c80
Parents: 9f96b32
Author: Nikolaas Steenbergen <Ni...@googlemail.com>
Authored: Mon Jun 8 14:01:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 4 15:57:50 2015 +0200

----------------------------------------------------------------------
 docs/apis/scala_shell.md                        | 10 +++
 .../api/java/ScalaShellRemoteEnvironment.java   | 20 ++++-
 .../org.apache.flink/api/scala/FlinkILoop.scala | 25 ++++--
 .../org.apache.flink/api/scala/FlinkShell.scala | 25 ++++--
 .../flink/api/scala/ScalaShellITSuite.scala     | 80 +++++++++++++-------
 .../start-script/start-scala-shell.sh           | 30 +++++++-
 6 files changed, 150 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
index 0ce070c..9bd04d9 100644
--- a/docs/apis/scala_shell.md
+++ b/docs/apis/scala_shell.md
@@ -68,3 +68,13 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Adding external dependencies
+
+It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
+
+Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
+
+~~~bash
+bin/start-scala-shell --addclasspath <path/to/jar.jar>
+~~~
+

http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 79f9576..54af5bc 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,12 +19,17 @@ package org.apache.flink.api.java;
  * limitations under the License.
  */
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 
 import org.apache.flink.api.scala.FlinkILoop;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
  * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
@@ -61,8 +66,19 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 
 		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
 
-		// call "traditional" execution methods
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
+		// get "external jars, and add the shell command jar, pass to executor
+		List<String> alljars = new ArrayList<String>();
+		// get external (library) jars
+		String[] extJars = this.flinkILoop.getExternalJars();
+		
+		if(!ArrayUtils.isEmpty(extJars)) {
+			alljars.addAll(Arrays.asList(extJars));
+		}
+		// add shell commands
+		alljars.add(jarFile);
+		String[] alljarsArr = new String[alljars.size()];
+		alljarsArr = alljars.toArray(alljarsArr);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
 
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 10b7ad9..2797e4b 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -29,16 +29,27 @@ import org.apache.flink.util.AbstractID
 class FlinkILoop(
     val host: String,
     val port: Int,
+    val externalJars: Option[Array[String]],
     in0: Option[BufferedReader],
     out0: JPrintWriter)
   extends ILoop(in0, out0) {
-
-  def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
-    this(host:String, port:Int, Some(in0), out)
+  
+  
+  
+  def this(host:String, 
+           port:Int, 
+           externalJars : Option[Array[String]], 
+           in0: BufferedReader, 
+           out: JPrintWriter){
+    this(host:String, port:Int, externalJars, Some(in0), out)
   }
 
-  def this(host:String, port:Int){
-    this(host:String,port: Int,None, new JPrintWriter(Console.out, true))
+  def this(host:String, port:Int, externalJars : Option[Array[String]]){
+    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+  }
+  
+  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
   }
   // remote environment
   private val remoteEnv: ScalaShellRemoteEnvironment = {
@@ -126,6 +137,7 @@ class FlinkILoop(
     }
 
     val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
     val jarFilePath = new File(tmpJarShell.getAbsolutePath)
 
     val jh: JarHelper = new JarHelper
@@ -191,5 +203,8 @@ HINT: You can use print() on a DataSet to print the contents to this shell.
     )
 
   }
+
+  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
index 25ab492..a4fae91 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.scala
 
 
@@ -29,8 +30,10 @@ object FlinkShell {
   def main(args: Array[String]) {
 
     // scopt, command line arguments
-    case class Config(port: Int = -1,
-                      host: String = "none")
+    case class Config(
+        port: Int = -1,
+        host: String = "none",
+        externalJars: Option[Array[String]] = None)
     val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
       head ("Flink Scala Shell")
 
@@ -44,6 +47,12 @@ object FlinkShell {
           c.copy (host = x)
       }  text("host specifies host name of running JobManager")
 
+      opt[(String)] ('a',"addclasspath") action {
+        case (x,c) =>
+          val xArray = x.split(":")
+          c.copy(externalJars = Option(xArray))
+      } text("specifies additional jars to be used in Flink")
+      
       help("help") text("prints this usage text")
     }
 
@@ -51,14 +60,18 @@ object FlinkShell {
     // parse arguments
     parser.parse (args, Config () ) match {
       case Some(config) =>
-        startShell(config.host,config.port)
+        startShell(config.host,config.port,config.externalJars)
 
       case _ => println("Could not parse program arguments")
     }
   }
 
 
-  def startShell(userHost : String, userPort : Int): Unit ={
+  def startShell(
+      userHost : String, 
+      userPort : Int, 
+      externalJars : Option[Array[String]] = None): Unit ={
+    
     println("Starting Flink Shell:")
 
     var cluster: LocalFlinkMiniCluster = null
@@ -73,9 +86,9 @@ object FlinkShell {
       println(s"Connecting to remote server (host: $userHost, port: $userPort).")
       (userHost, userPort)
     }
-
+    
     // custom shell
-    val repl = new FlinkILoop(host, port) //new MyILoop();
+    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
 
     repl.settings = new Settings()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 1b04f7f..9717ae7 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.net.URLClassLoader
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
 
-import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
@@ -122,15 +120,47 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     output should include("WC(hello,1)")
     output should include("WC(world,10)")
   }
+  
+  
+  test("Submit external library") {
+    
+    val input : String =
+      """
+        import org.apache.flink.ml.math._
+        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
+        denseVectors.print()
+      """.stripMargin
+
+    // find jar file that contains the ml code
+    var externalJar : String = ""
+    var folder : File = new File("../flink-ml/target/");
+    var listOfFiles : Array[File] = folder.listFiles();
+    for(i <- 0 to listOfFiles.length - 1){
+      var filename : String = listOfFiles(i).getName();
+      if(!filename.contains("test") && !filename.contains("original") && filename.contains(".jar")){
+        println("ive found file:" + listOfFiles(i).getAbsolutePath)
+        externalJar = listOfFiles(i).getAbsolutePath
+      }
+    }
+
+    assert(externalJar != "")
 
+    val output : String = processInShell(input,Option(externalJar))
+
+    output should not include "failed"
+    output should not include "error"
+    output should not include "Exception"
+
+    output should include( "\nDenseVector(1.0, 2.0, 3.0)")
+  }
 
   /**
    * Run the input using a Scala Shell and return the output of the shell.
    * @param input commands to be processed in the shell
    * @return output of shell
    */
-  def processInShell(input : String): String ={
-
+  def processInShell(input : String, externalJars : Option[String] = None): String ={
+    
     val in = new BufferedReader(new StringReader(input + "\n"))
     val out = new StringWriter()
     val baos = new ByteArrayOutputStream()
@@ -142,32 +172,31 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     val host = "localhost"
     val port = cluster match {
       case Some(c) => c.getLeaderRPCPort
-
       case _ => throw new RuntimeException("Test cluster not initialized.")
     }
-
-    val cl = getClass.getClassLoader
-    var paths = new ArrayBuffer[String]
-    cl match {
-      case urlCl: URLClassLoader =>
-        for (url <- urlCl.getURLs) {
-          if (url.getProtocol == "file") {
-            paths += url.getFile
-          }
-        }
-      case _ =>
+    
+    var repl : FlinkILoop= null 
+    
+    externalJars match {
+      case Some(ej) => repl = new FlinkILoop(
+        host, port,  
+        Option(Array(ej)), 
+        in, new PrintWriter(out))
+        
+      case None => repl = new FlinkILoop(
+        host,port,
+        in,new PrintWriter(out))
     }
-
-    val classpath = paths.mkString(File.pathSeparator)
-
-    val repl = new FlinkILoop(host, port, in, new PrintWriter(out)) //new MyILoop();
-
+    
     repl.settings = new Settings()
 
     // enable this line to use scala in intellij
     repl.settings.usejavacp.value = true
-
-    repl.addedClasspath = classpath
+    
+    externalJars match {
+      case Some(ej) => repl.settings.classpath.value = ej
+      case None => 
+    }
 
     repl.process(repl.settings)
 
@@ -175,11 +204,10 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
     System.setOut(oldOut)
 
+    baos.flush()
+    
     val stdout = baos.toString
 
-    // reprint because ScalaTest fails if we don't
-    print(stdout)
-
     out.toString + stdout
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
index 4bbd49a..529b23c 100644
--- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -52,7 +52,35 @@ bin=`cd "$bin"; pwd`
 
 FLINK_CLASSPATH=`constructFlinkClassPath`
 
-java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
+# in scala shell since 2.10, has to be done at startup
+# checks arguments for additional classpath and adds it to the "standard classpath"
+
+EXTERNAL_LIB_FOUND=false
+for ((i=1;i<=$#;i++))
+do
+    if [[  ${!i} = "-a" || ${!i} = "--addclasspath" ]]
+    then
+	EXTERNAL_LIB_FOUND=true
+	
+        #adding to classpath
+        k=$((i+1))
+        j=$((k+1))
+        echo " "
+        echo "Additional classpath:${!k}"
+        echo " "
+        EXT_CLASSPATH="${!k}"
+        FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}"
+        set -- "${@:1:$((i-1))}" "${@:j}"
+    fi
+done
+
+if ${EXTERNAL_LIB_FOUND}
+then
+    java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell --addclasspath "$EXT_CLASSPATH" $@
+else
+    java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+fi
 
 #restore echo
 onExit
\ No newline at end of file