You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/07 16:11:14 UTC
[2/2] flink git commit: [FLINK-2767] [scala shell] Add Scala 2.11
support to Scala shell. Update Scala 2.11 version and jline dependency.
[FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell.
Update Scala 2.11 version and jline dependency.
This closes #1197
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d62033c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d62033c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d62033c
Branch: refs/heads/master
Commit: 8d62033c23f50ac1c8ccca04b70c4fda1b8ba46c
Parents: a437a2b
Author: Chiwan Park <ch...@apache.org>
Authored: Sat Sep 26 02:47:06 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:57:40 2015 +0200
----------------------------------------------------------------------
flink-dist/pom.xml | 29 +--
flink-staging/flink-scala-shell/pom.xml | 13 +-
.../apache/flink/api/scala/ILoopCompat.scala | 29 +++
.../apache/flink/api/scala/ILoopCompat.scala | 31 +++
.../org.apache.flink/api/scala/FlinkILoop.scala | 218 ------------------
.../org.apache.flink/api/scala/FlinkShell.scala | 108 ---------
.../org/apache/flink/api/scala/FlinkILoop.scala | 224 +++++++++++++++++++
.../org/apache/flink/api/scala/FlinkShell.scala | 107 +++++++++
.../flink/api/scala/ScalaShellITSuite.scala | 2 +-
flink-staging/pom.xml | 18 +-
pom.xml | 2 +-
11 files changed, 407 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 32059ea..f1745ed 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -125,34 +125,17 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
- <id>scala-2.10</id>
- <activation>
-
- <property>
- <!-- this is the default scala profile -->
- <name>!scala-2.11</name>
- </property>
- </activation>
-
- <properties>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala-shell</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </profile>
- <profile>
<id>include-yarn</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
index 94718a3..5adb8c6 100644
--- a/flink-staging/flink-scala-shell/pom.xml
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -76,12 +76,6 @@ under the License.
<version>${scala.version}</version>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>jline</artifactId>
- <version>2.10.4</version>
- </dependency>
-
<!-- tests -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -180,6 +174,7 @@ under the License.
<configuration>
<sources>
<source>src/main/scala</source>
+ <source>src/main/scala-${scala.binary.version}</source>
</sources>
</configuration>
</execution>
@@ -274,6 +269,12 @@ under the License.
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
<version>${scala.macros.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>2.10.4</version>
+ </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..797b420
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..c1be6db
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+
+ protected def addThunk(f: => Unit): Unit = f
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
deleted file mode 100644
index cd8a846..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{BufferedReader, File, FileOutputStream}
-
-import scala.tools.nsc.interpreter._
-
-import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
-import org.apache.flink.util.AbstractID
-
-
-class FlinkILoop(
- val host: String,
- val port: Int,
- val externalJars: Option[Array[String]],
- in0: Option[BufferedReader],
- out0: JPrintWriter)
- extends ILoop(in0, out0) {
-
-
-
- def this(host:String,
- port:Int,
- externalJars : Option[Array[String]],
- in0: BufferedReader,
- out: JPrintWriter){
- this(host:String, port:Int, externalJars, Some(in0), out)
- }
-
- def this(host:String, port:Int, externalJars : Option[Array[String]]){
- this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
- }
-
- def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
- this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
- }
- // remote environment
- private val remoteEnv: ScalaShellRemoteEnvironment = {
- // allow creation of environments
- ScalaShellRemoteEnvironment.resetContextEnvironments()
-
- // create our environment that submits against the cluster (local or remote)
- val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-
- // prevent further instantiation of environments
- ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
-
- remoteEnv
- }
-
- // local environment
- val scalaEnv: ExecutionEnvironment = {
- val scalaEnv = new ExecutionEnvironment(remoteEnv)
- scalaEnv
- }
-
- addThunk {
- intp.beQuietDuring {
- // automatically imports the flink scala api
- intp.addImports("org.apache.flink.api.scala._")
- intp.addImports("org.apache.flink.api.common.functions._")
- // with this we can access this object in the scala shell
- intp.bindValue("env", this.scalaEnv)
- }
- }
-
-
- /**
- * creates a temporary directory to store compiled console files
- */
- private val tmpDirBase: File = {
- // get unique temporary folder:
- val abstractID: String = new AbstractID().toString
- val tmpDir: File = new File(
- System.getProperty("java.io.tmpdir"),
- "scala_shell_tmp-" + abstractID)
- if (!tmpDir.exists) {
- tmpDir.mkdir
- }
- tmpDir
- }
-
- // scala_shell commands
- private val tmpDirShell: File = {
- new File(tmpDirBase, "scala_shell_commands")
- }
-
- // scala shell jar file name
- private val tmpJarShell: File = {
- new File(tmpDirBase, "scala_shell_commands.jar")
- }
-
-
- /**
- * Packages the compiled classes of the current shell session into a Jar file for execution
- * on a Flink cluster.
- *
- * @return The path of the created Jar file
- */
- def writeFilesToDisk(): File = {
- val vd = intp.virtualDirectory
-
- val vdIt = vd.iterator
-
- for (fi <- vdIt) {
- if (fi.isDirectory) {
-
- val fiIt = fi.iterator
-
- for (f <- fiIt) {
-
- // directory for compiled line
- val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
- lineDir.mkdirs()
-
- // compiled classes for commands from shell
- val writeFile = new File(lineDir.getAbsolutePath, f.name)
- val outputStream = new FileOutputStream(writeFile)
- val inputStream = f.input
-
- // copy file contents
- org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
- inputStream.close()
- outputStream.close()
- }
- }
- }
-
- val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
- val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
- val jh: JarHelper = new JarHelper
- jh.jarDir(compiledClasses, jarFilePath)
-
- jarFilePath
- }
-
- /**
- * CUSTOM START METHODS OVERRIDE:
- */
- override def prompt = "Scala-Flink> "
-
- /**
- * custom welcome message
- */
- override def printWelcome() {
- echo(
- // scalastyle:off
- """
- \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
- \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
- \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592
- \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588
- \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592
- \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588
- \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
- \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
- \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
- \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
- \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
- \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
- \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
- \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591
- \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
- \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592
- \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592
- \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588
- \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588
-\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593
-\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593
-\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593
-\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
- \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592
- \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593
- \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
- \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588
- \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593
- \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
- \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
- \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592
- \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591
-
- F L I N K - S C A L A - S H E L L
-
-NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
- * env.readTextFile("/path/to/data")
- * env.execute("Program name")
-
-HINT: You can use print() on a DataSet to print the contents to this shell.
- """
- // scalastyle:on
- )
-
- }
-
- def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
deleted file mode 100644
index a4fae91..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-
-import scala.tools.nsc.Settings
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-
-
-object FlinkShell {
-
- def main(args: Array[String]) {
-
- // scopt, command line arguments
- case class Config(
- port: Int = -1,
- host: String = "none",
- externalJars: Option[Array[String]] = None)
- val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
- head ("Flink Scala Shell")
-
- opt[Int] ('p', "port") action {
- (x, c) =>
- c.copy (port = x)
- } text("port specifies port of running JobManager")
-
- opt[(String)] ('h',"host") action {
- case (x, c) =>
- c.copy (host = x)
- } text("host specifies host name of running JobManager")
-
- opt[(String)] ('a',"addclasspath") action {
- case (x,c) =>
- val xArray = x.split(":")
- c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
-
- help("help") text("prints this usage text")
- }
-
-
- // parse arguments
- parser.parse (args, Config () ) match {
- case Some(config) =>
- startShell(config.host,config.port,config.externalJars)
-
- case _ => println("Could not parse program arguments")
- }
- }
-
-
- def startShell(
- userHost : String,
- userPort : Int,
- externalJars : Option[Array[String]] = None): Unit ={
-
- println("Starting Flink Shell:")
-
- var cluster: LocalFlinkMiniCluster = null
-
- // either port or userhost not specified by user, create new minicluster
- val (host,port) = if (userHost == "none" || userPort == -1 ) {
- println("Creating new local server")
- cluster = new LocalFlinkMiniCluster(new Configuration, false)
- cluster.start()
- ("localhost",cluster.getLeaderRPCPort)
- } else {
- println(s"Connecting to remote server (host: $userHost, port: $userPort).")
- (userHost, userPort)
- }
-
- // custom shell
- val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
- repl.settings = new Settings()
-
- repl.settings.usejavacp.value = true
-
- // start scala interpreter shell
- repl.process(repl.settings)
-
- //repl.closeInterpreter()
-
- if (cluster != null) {
- cluster.stop()
- }
-
- println(" good bye ..")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..bcf9bc2
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
+import org.apache.flink.util.AbstractID
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+ val host: String,
+ val port: Int,
+ val externalJars: Option[Array[String]],
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoopCompat(in0, out0) {
+
+ def this(host:String,
+ port:Int,
+ externalJars : Option[Array[String]],
+ in0: BufferedReader,
+ out: JPrintWriter){
+ this(host:String, port:Int, externalJars, Some(in0), out)
+ }
+
+ def this(host:String, port:Int, externalJars : Option[Array[String]]){
+ this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+ }
+
+ def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+ this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
+ }
+
+ // remote environment
+ private val remoteEnv: ScalaShellRemoteEnvironment = {
+ // allow creation of environments
+ ScalaShellRemoteEnvironment.resetContextEnvironments()
+
+ // create our environment that submits against the cluster (local or remote)
+ val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+
+ // prevent further instantiation of environments
+ ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+
+ remoteEnv
+ }
+
+ // local environment
+ val scalaEnv: ExecutionEnvironment = {
+ val scalaEnv = new ExecutionEnvironment(remoteEnv)
+ scalaEnv
+ }
+
+ /**
+ * creates a temporary directory to store compiled console files
+ */
+ private val tmpDirBase: File = {
+ // get unique temporary folder:
+ val abstractID: String = new AbstractID().toString
+ val tmpDir: File = new File(
+ System.getProperty("java.io.tmpdir"),
+ "scala_shell_tmp-" + abstractID)
+ if (!tmpDir.exists) {
+ tmpDir.mkdir
+ }
+ tmpDir
+ }
+
+ // scala_shell commands
+ private val tmpDirShell: File = {
+ new File(tmpDirBase, "scala_shell_commands")
+ }
+
+ // scala shell jar file name
+ private val tmpJarShell: File = {
+ new File(tmpDirBase, "scala_shell_commands.jar")
+ }
+
+ private val packageImports = Seq[String](
+ "org.apache.flink.api.scala._",
+ "org.apache.flink.api.common.functions._"
+ )
+
+ override def createInterpreter(): Unit = {
+ super.createInterpreter()
+
+ addThunk {
+ intp.beQuietDuring {
+ // import dependencies
+ intp.interpret("import " + packageImports.mkString(", "))
+
+ // set execution environment
+ intp.bind("env", this.scalaEnv)
+ }
+ }
+ }
+
+ /**
+ * Packages the compiled classes of the current shell session into a Jar file for execution
+ * on a Flink cluster.
+ *
+ * @return The path of the created Jar file
+ */
+ def writeFilesToDisk(): File = {
+ val vd = intp.virtualDirectory
+
+ val vdIt = vd.iterator
+
+ for (fi <- vdIt) {
+ if (fi.isDirectory) {
+
+ val fiIt = fi.iterator
+
+ for (f <- fiIt) {
+
+ // directory for compiled line
+ val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+ lineDir.mkdirs()
+
+ // compiled classes for commands from shell
+ val writeFile = new File(lineDir.getAbsolutePath, f.name)
+ val outputStream = new FileOutputStream(writeFile)
+ val inputStream = f.input
+
+ // copy file contents
+ org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+ inputStream.close()
+ outputStream.close()
+ }
+ }
+ }
+
+ val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+ val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+ val jh: JarHelper = new JarHelper
+ jh.jarDir(compiledClasses, jarFilePath)
+
+ jarFilePath
+ }
+
+ /**
+ * CUSTOM START METHODS OVERRIDE:
+ */
+ override def prompt = "Scala-Flink> "
+
+ /**
+ * custom welcome message
+ */
+ override def printWelcome() {
+ echo(
+ // scalastyle:off
+ """
+ \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+ \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
+ \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592
+ \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588
+ \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592
+ \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588
+ \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+ \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+ \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+ \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
+ \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+ \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+ \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
+ \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591
+ \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592
+ \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592
+ \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588
+ \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593
+\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
+ \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592
+ \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593
+ \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
+ \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588
+ \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593
+ \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
+ \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
+ \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592
+ \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591
+
+ F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+ * env.readTextFile("/path/to/data")
+ * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+ """
+ // scalastyle:on
+ )
+
+ }
+
+ def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..224983b
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.Settings
+
+
+object FlinkShell {
+
+ def main(args: Array[String]) {
+
+ // scopt, command line arguments
+ case class Config(
+ port: Int = -1,
+ host: String = "none",
+ externalJars: Option[Array[String]] = None)
+ val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+ head ("Flink Scala Shell")
+
+ opt[Int] ('p', "port") action {
+ (x, c) =>
+ c.copy (port = x)
+ } text("port specifies port of running JobManager")
+
+ opt[(String)] ('h',"host") action {
+ case (x, c) =>
+ c.copy (host = x)
+ } text("host specifies host name of running JobManager")
+
+ opt[(String)] ('a',"addclasspath") action {
+ case (x,c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink")
+
+ help("help") text("prints this usage text")
+ }
+
+
+ // parse arguments
+ parser.parse (args, Config () ) match {
+ case Some(config) =>
+ startShell(config.host,config.port,config.externalJars)
+
+ case _ => println("Could not parse program arguments")
+ }
+ }
+
+
+ def startShell(
+ userHost : String,
+ userPort : Int,
+ externalJars : Option[Array[String]] = None): Unit ={
+
+ println("Starting Flink Shell:")
+
+ var cluster: LocalFlinkMiniCluster = null
+
+ // either port or userhost not specified by user, create new minicluster
+ val (host,port) = if (userHost == "none" || userPort == -1 ) {
+ println("Creating new local server")
+ cluster = new LocalFlinkMiniCluster(new Configuration, false)
+ cluster.start()
+ ("localhost",cluster.getLeaderRPCPort)
+ } else {
+ println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+ (userHost, userPort)
+ }
+
+ // custom shell
+ val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
+
+ repl.settings = new Settings()
+
+ repl.settings.usejavacp.value = true
+
+ // start scala interpreter shell
+ repl.process(repl.settings)
+
+ //repl.closeInterpreter()
+
+ if (cluster != null) {
+ cluster.stop()
+ }
+
+ println(" good bye ..")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 7648c50..0621351 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,7 +22,7 @@ import java.io._
import java.util.concurrent.TimeUnit
import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 271d26c..67aec5a 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
<module>flink-ml</module>
<module>flink-language-binding</module>
<module>flink-gelly-scala</module>
+ <module>flink-scala-shell</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
@@ -71,22 +72,5 @@ under the License.
<module>flink-tez</module>
</modules>
</profile>
- <profile>
- <id>scala-2.10</id>
- <activation>
-
- <property>
- <!-- this is the default scala profile -->
- <name>!scala-2.11</name>
- </property>
- </activation>
- <properties>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- </properties>
- <modules>
- <module>flink-scala-shell</module>
- </modules>
- </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a5ff01..7e90ad6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,7 +370,7 @@ under the License.
</property>
</activation>
<properties>
- <scala.version>2.11.4</scala.version>
+ <scala.version>2.11.7</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
</profile>