You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/04 16:59:58 UTC
flink git commit: [FLINK-2161] [scala shell] Modify start script to
take additional argument (-a or --addclasspath
) for external libraries
Repository: flink
Updated Branches:
refs/heads/master 9f96b32bf -> 3d528442f
[FLINK-2161] [scala shell] Modify start script to take additional argument (-a <path/to/class> or --addclasspath <path/to/class>) for external libraries
This closes #805
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d528442
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d528442
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d528442
Branch: refs/heads/master
Commit: 3d528442f6993c64600224483ee3048c40763c80
Parents: 9f96b32
Author: Nikolaas Steenbergen <Ni...@googlemail.com>
Authored: Mon Jun 8 14:01:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 4 15:57:50 2015 +0200
----------------------------------------------------------------------
docs/apis/scala_shell.md | 10 +++
.../api/java/ScalaShellRemoteEnvironment.java | 20 ++++-
.../org.apache.flink/api/scala/FlinkILoop.scala | 25 ++++--
.../org.apache.flink/api/scala/FlinkShell.scala | 25 ++++--
.../flink/api/scala/ScalaShellITSuite.scala | 80 +++++++++++++-------
.../start-script/start-scala-shell.sh | 30 +++++++-
6 files changed, 150 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
index 0ce070c..9bd04d9 100644
--- a/docs/apis/scala_shell.md
+++ b/docs/apis/scala_shell.md
@@ -68,3 +68,13 @@ Scala-Flink> env.execute("MyProgram")
The Flink Shell comes with command history and autocompletion.
+## Adding external dependencies
+
+It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
+
+Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
+
+~~~bash
+bin/start-scala-shell --addclasspath <path/to/jar.jar>
+~~~
+
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 79f9576..54af5bc 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,12 +19,17 @@ package org.apache.flink.api.java;
* limitations under the License.
*/
+import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
/**
* Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
* to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
@@ -61,8 +66,19 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
- // call "traditional" execution methods
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
+ // get "external jars, and add the shell command jar, pass to executor
+ List<String> alljars = new ArrayList<String>();
+ // get external (library) jars
+ String[] extJars = this.flinkILoop.getExternalJars();
+
+ if(!ArrayUtils.isEmpty(extJars)) {
+ alljars.addAll(Arrays.asList(extJars));
+ }
+ // add shell commands
+ alljars.add(jarFile);
+ String[] alljarsArr = new String[alljars.size()];
+ alljarsArr = alljars.toArray(alljarsArr);
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 10b7ad9..2797e4b 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -29,16 +29,27 @@ import org.apache.flink.util.AbstractID
class FlinkILoop(
val host: String,
val port: Int,
+ val externalJars: Option[Array[String]],
in0: Option[BufferedReader],
out0: JPrintWriter)
extends ILoop(in0, out0) {
-
- def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
- this(host:String, port:Int, Some(in0), out)
+
+
+
+ def this(host:String,
+ port:Int,
+ externalJars : Option[Array[String]],
+ in0: BufferedReader,
+ out: JPrintWriter){
+ this(host:String, port:Int, externalJars, Some(in0), out)
}
- def this(host:String, port:Int){
- this(host:String,port: Int,None, new JPrintWriter(Console.out, true))
+ def this(host:String, port:Int, externalJars : Option[Array[String]]){
+ this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+ }
+
+ def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+ this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
}
// remote environment
private val remoteEnv: ScalaShellRemoteEnvironment = {
@@ -126,6 +137,7 @@ class FlinkILoop(
}
val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
val jarFilePath = new File(tmpJarShell.getAbsolutePath)
val jh: JarHelper = new JarHelper
@@ -191,5 +203,8 @@ HINT: You can use print() on a DataSet to print the contents to this shell.
)
}
+
+ def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
index 25ab492..a4fae91 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.scala
@@ -29,8 +30,10 @@ object FlinkShell {
def main(args: Array[String]) {
// scopt, command line arguments
- case class Config(port: Int = -1,
- host: String = "none")
+ case class Config(
+ port: Int = -1,
+ host: String = "none",
+ externalJars: Option[Array[String]] = None)
val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
head ("Flink Scala Shell")
@@ -44,6 +47,12 @@ object FlinkShell {
c.copy (host = x)
} text("host specifies host name of running JobManager")
+ opt[(String)] ('a',"addclasspath") action {
+ case (x,c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink")
+
help("help") text("prints this usage text")
}
@@ -51,14 +60,18 @@ object FlinkShell {
// parse arguments
parser.parse (args, Config () ) match {
case Some(config) =>
- startShell(config.host,config.port)
+ startShell(config.host,config.port,config.externalJars)
case _ => println("Could not parse program arguments")
}
}
- def startShell(userHost : String, userPort : Int): Unit ={
+ def startShell(
+ userHost : String,
+ userPort : Int,
+ externalJars : Option[Array[String]] = None): Unit ={
+
println("Starting Flink Shell:")
var cluster: LocalFlinkMiniCluster = null
@@ -73,9 +86,9 @@ object FlinkShell {
println(s"Connecting to remote server (host: $userHost, port: $userPort).")
(userHost, userPort)
}
-
+
// custom shell
- val repl = new FlinkILoop(host, port) //new MyILoop();
+ val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
repl.settings = new Settings()
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 1b04f7f..9717ae7 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -19,14 +19,12 @@
package org.apache.flink.api.scala
import java.io._
-import java.net.URLClassLoader
import java.util.concurrent.TimeUnit
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
-import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
@@ -122,15 +120,47 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
output should include("WC(hello,1)")
output should include("WC(world,10)")
}
+
+
+ test("Submit external library") {
+
+ val input : String =
+ """
+ import org.apache.flink.ml.math._
+ val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
+ denseVectors.print()
+ """.stripMargin
+
+ // find jar file that contains the ml code
+ var externalJar : String = ""
+ var folder : File = new File("../flink-ml/target/");
+ var listOfFiles : Array[File] = folder.listFiles();
+ for(i <- 0 to listOfFiles.length - 1){
+ var filename : String = listOfFiles(i).getName();
+ if(!filename.contains("test") && !filename.contains("original") && filename.contains(".jar")){
+ println("ive found file:" + listOfFiles(i).getAbsolutePath)
+ externalJar = listOfFiles(i).getAbsolutePath
+ }
+ }
+
+ assert(externalJar != "")
+ val output : String = processInShell(input,Option(externalJar))
+
+ output should not include "failed"
+ output should not include "error"
+ output should not include "Exception"
+
+ output should include( "\nDenseVector(1.0, 2.0, 3.0)")
+ }
/**
* Run the input using a Scala Shell and return the output of the shell.
* @param input commands to be processed in the shell
* @return output of shell
*/
- def processInShell(input : String): String ={
-
+ def processInShell(input : String, externalJars : Option[String] = None): String ={
+
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
val baos = new ByteArrayOutputStream()
@@ -142,32 +172,31 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val host = "localhost"
val port = cluster match {
case Some(c) => c.getLeaderRPCPort
-
case _ => throw new RuntimeException("Test cluster not initialized.")
}
-
- val cl = getClass.getClassLoader
- var paths = new ArrayBuffer[String]
- cl match {
- case urlCl: URLClassLoader =>
- for (url <- urlCl.getURLs) {
- if (url.getProtocol == "file") {
- paths += url.getFile
- }
- }
- case _ =>
+
+ var repl : FlinkILoop= null
+
+ externalJars match {
+ case Some(ej) => repl = new FlinkILoop(
+ host, port,
+ Option(Array(ej)),
+ in, new PrintWriter(out))
+
+ case None => repl = new FlinkILoop(
+ host,port,
+ in,new PrintWriter(out))
}
-
- val classpath = paths.mkString(File.pathSeparator)
-
- val repl = new FlinkILoop(host, port, in, new PrintWriter(out)) //new MyILoop();
-
+
repl.settings = new Settings()
// enable this line to use scala in intellij
repl.settings.usejavacp.value = true
-
- repl.addedClasspath = classpath
+
+ externalJars match {
+ case Some(ej) => repl.settings.classpath.value = ej
+ case None =>
+ }
repl.process(repl.settings)
@@ -175,11 +204,10 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
System.setOut(oldOut)
+ baos.flush()
+
val stdout = baos.toString
- // reprint because ScalaTest fails if we don't
- print(stdout)
-
out.toString + stdout
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3d528442/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
index 4bbd49a..529b23c 100644
--- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -52,7 +52,35 @@ bin=`cd "$bin"; pwd`
FLINK_CLASSPATH=`constructFlinkClassPath`
-java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
+# in scala shell since 2.10, has to be done at startup
+# checks arguments for additional classpath and adds it to the "standard classpath"
+
+EXTERNAL_LIB_FOUND=false
+for ((i=1;i<=$#;i++))
+do
+ if [[ ${!i} = "-a" || ${!i} = "--addclasspath" ]]
+ then
+ EXTERNAL_LIB_FOUND=true
+
+ #adding to classpath
+ k=$((i+1))
+ j=$((k+1))
+ echo " "
+ echo "Additional classpath:${!k}"
+ echo " "
+ EXT_CLASSPATH="${!k}"
+ FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}"
+ set -- "${@:1:$((i-1))}" "${@:j}"
+ fi
+done
+
+if ${EXTERNAL_LIB_FOUND}
+then
+ java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell --addclasspath "$EXT_CLASSPATH" $@
+else
+ java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+fi
#restore echo
onExit
\ No newline at end of file